diff --git a/CHANGELOG.md b/CHANGELOG.md
index f2207025..6eb11e55 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,8 @@
 ## Release History
+### 1.11.0-Beta.1 (Unreleased)
+#### Other Changes
+* Construct `SourceRecord` offset based on `_lsn` from the item in `CosmosDBSourceConnector`. [PR 534](https://github.com/microsoft/kafka-connect-cosmosdb/pull/534)
+
 ### 1.10.0 (2023-10-10)
 #### New Features
 * Added compression feature to resolve duplicate records in a single batch when consuming from kafka topic in the bulk mode for sink connector through new config `connect.cosmos.sink.bulk.compression.enabled`. [PR 515](https://github.com/microsoft/kafka-connect-cosmosdb/pull/515)
diff --git a/pom.xml b/pom.xml
index ab6a50bf..5567b949 100644
--- a/pom.xml
+++ b/pom.xml
@@ -7,7 +7,7 @@
 
     <groupId>com.azure.cosmos.kafka</groupId>
     <artifactId>kafka-connect-cosmos</artifactId>
-    <version>1.10.0</version>
+    <version>1.11.0-Beta.1</version>
 
     <name> kafka-connect-cosmos</name>
     <url>https://github.com/microsoft/kafka-connect-cosmosdb</url>
diff --git a/src/main/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTask.java b/src/main/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTask.java
index 5e3c2757..bb87e758 100644
--- a/src/main/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTask.java
+++ b/src/main/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTask.java
@@ -17,7 +17,6 @@
 import com.azure.cosmos.models.CosmosContainerProperties;
 import com.azure.cosmos.models.CosmosContainerRequestOptions;
 import com.azure.cosmos.models.CosmosContainerResponse;
-import com.azure.cosmos.models.CosmosQueryRequestOptions;
 import com.azure.cosmos.models.ThroughputProperties;
 import com.fasterxml.jackson.databind.JsonNode;
 import org.apache.kafka.connect.data.Schema;
@@ -44,7 +43,7 @@ public class CosmosDBSourceTask extends SourceTask {
 
     private static final Logger logger = LoggerFactory.getLogger(CosmosDBSourceTask.class);
     private static final String OFFSET_KEY = "recordContinuationToken";
-    private static final String CONTINUATION_TOKEN = "ContinuationToken";
+    private static final String LSN_ATTRIBUTE_NAME = "_lsn";
 
     private final AtomicBoolean running = new AtomicBoolean(false);
     private CosmosAsyncClient client = null;
@@ -105,26 +104,8 @@ public void start(Map<String, String> map) {
         logger.info("Started CosmosDB source task.");
     }
 
-    private JsonNode getLeaseContainerRecord() {
-        String sql = "SELECT * FROM c WHERE IS_DEFINED(c.Owner)";
-        Iterable<JsonNode> filteredDocs = leaseContainer.queryItems(sql, new CosmosQueryRequestOptions(), JsonNode.class).toIterable();
-        if (filteredDocs.iterator().hasNext()) {
-            JsonNode result = filteredDocs.iterator().next();
-            // Return node only if it has the continuation token field present
-            if (result.has(CONTINUATION_TOKEN)) {
-                return result;
-            }
-        }
-
-        return null;
-    }
-
-    private String getContinuationToken() {
-        JsonNode leaseRecord = getLeaseContainerRecord();
-        if (client == null || leaseRecord == null) {
-            return null;
-        }
-        return leaseRecord.get(CONTINUATION_TOKEN).textValue();
+    private String getItemLsn(JsonNode item) {
+        return item.get(LSN_ATTRIBUTE_NAME).asText();
     }
 
     @Override
@@ -171,10 +152,8 @@ private void fillRecords(List<SourceRecord> records, String topic) throws Interr
                     messageKey = (messageKeyFieldNode != null) ? messageKeyFieldNode.toString() : "";
                 }
 
-                // Get the latest token and record as offset
-                // TODO: The continuationToken here is picked from any lease with owner, so maybe a little bit random
-                // change to show the continuationToken for the leases processed by the current worker
-                Map<String, Object> sourceOffset = singletonMap(OFFSET_KEY, getContinuationToken());
+                // Get the latest lsn and record as offset
+                Map<String, Object> sourceOffset = singletonMap(OFFSET_KEY, getItemLsn(node));
 
                 if (logger.isDebugEnabled()) {
                     logger.debug("Latest offset is {}.", sourceOffset.get(OFFSET_KEY));
diff --git a/src/test/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTaskTest.java b/src/test/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTaskTest.java
index 6d9f3320..fe6c05d6 100644
--- a/src/test/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTaskTest.java
+++ b/src/test/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTaskTest.java
@@ -35,6 +35,7 @@ public class CosmosDBSourceTaskTest {
     private final String topicName = "testtopic";
     private final String containerName = "container666";
     private final String databaseName = "fakeDatabase312";
+    private final String OFFSET_KEY = "recordContinuationToken";
     private CosmosAsyncClient mockCosmosClient;
     private CosmosAsyncContainer mockFeedContainer;
     private CosmosAsyncContainer mockLeaseContainer;
@@ -97,7 +98,7 @@ public void setup() throws IllegalAccessException {
 
     @Test
     public void testHandleChanges() throws JsonProcessingException, IllegalAccessException, InterruptedException {
-        String jsonString = "{\"k1\":\"v1\",\"k2\":\"v2\"}";
+        String jsonString = "{\"k1\":\"v1\",\"k2\":\"v2\", \"_lsn\":\"2\"}";
         ObjectMapper mapper = new ObjectMapper();
         JsonNode actualObj = mapper.readTree(jsonString);
         List<JsonNode> changes = new ArrayList<>();
@@ -124,7 +125,7 @@ public void testHandleChanges() throws JsonProcessingException, IllegalAccessExc
 
     @Test
     public void testPoll() throws InterruptedException, JsonProcessingException, IllegalAccessException {
-        String jsonString = "{\"k1\":\"v1\",\"k2\":\"v2\"}";
+        String jsonString = "{\"k1\":\"v1\",\"k2\":\"v2\", \"_lsn\":\"2\"}";
         ObjectMapper mapper = new ObjectMapper();
         JsonNode actualObj = mapper.readTree(jsonString);
         List<JsonNode> changes = new ArrayList<>();
@@ -144,7 +145,7 @@ public void testPoll() throws InterruptedException, JsonProcessingException, Ill
     @Test
     public void testPoll_shouldFillMoreRecordsFalse() throws InterruptedException, JsonProcessingException, IllegalAccessException {
         // test when should fillMoreRecords false, then poll method will return immediately
-        String jsonString = "{\"k1\":\"v1\",\"k2\":\"v2\"}";
+        String jsonString = "{\"k1\":\"v1\",\"k2\":\"v2\", \"_lsn\":\"2\"}";
         ObjectMapper mapper = new ObjectMapper();
         JsonNode actualObj = mapper.readTree(jsonString);
 
@@ -168,7 +169,7 @@ public void testPoll_shouldFillMoreRecordsFalse() throws InterruptedException, J
 
     @Test
     public void testPollWithMessageKey() throws InterruptedException, JsonProcessingException {
-        String jsonString = "{\"id\":123,\"k1\":\"v1\",\"k2\":\"v2\"}";
+        String jsonString = "{\"id\":123,\"k1\":\"v1\",\"k2\":\"v2\", \"_lsn\":\"2\"}";
         ObjectMapper mapper = new ObjectMapper();
         JsonNode actualObj = mapper.readTree(jsonString);
         List<JsonNode> changes = new ArrayList<>();
@@ -183,9 +184,27 @@ public void testPollWithMessageKey() throws InterruptedException, JsonProcessing
         Assert.assertEquals("123", result.get(0).key());
     }
 
+    @Test
+    public void testSourceRecordOffset() throws InterruptedException, JsonProcessingException {
+        String jsonString = "{\"id\":123,\"k1\":\"v1\",\"k2\":\"v2\", \"_lsn\":\"2\"}";
+        ObjectMapper mapper = new ObjectMapper();
+        JsonNode actualObj = mapper.readTree(jsonString);
+        List<JsonNode> changes = new ArrayList<>();
+        changes.add(actualObj);
+
+        new Thread(() -> {
+            testTask.handleCosmosDbChanges(changes);
+        }).start();
+
+        List<SourceRecord> results = testTask.poll();
+        Assert.assertEquals(1, results.size());
+        Assert.assertEquals("123", results.get(0).key());
+        Assert.assertEquals("2", results.get(0).sourceOffset().get(OFFSET_KEY));
+    }
+
     @Test
     public void testZeroBatchSize() throws InterruptedException, JsonProcessingException, IllegalAccessException {
-        String jsonString = "{\"k1\":\"v1\",\"k2\":\"v2\"}";
+        String jsonString = "{\"k1\":\"v1\",\"k2\":\"v2\", \"_lsn\":\"2\"}";
         ObjectMapper mapper = new ObjectMapper();
         JsonNode actualObj = mapper.readTree(jsonString);
         List<JsonNode> changes = new ArrayList<>();
@@ -204,7 +223,7 @@ public void testZeroBatchSize() throws InterruptedException, JsonProcessingExcep
 
     @Test
     public void testSmallBufferSize() throws InterruptedException, JsonProcessingException, IllegalAccessException {
-        String jsonString = "{\"k1\":\"v1\",\"k2\":\"v2\"}";
+        String jsonString = "{\"k1\":\"v1\",\"k2\":\"v2\", \"_lsn\":\"2\"}";
         ObjectMapper mapper = new ObjectMapper();
         JsonNode actualObj = mapper.readTree(jsonString);
         List<JsonNode> changes = new ArrayList<>();
@@ -224,7 +243,7 @@ public void testSmallBufferSize() throws InterruptedException, JsonProcessingExc
 
     @Test(expected=IllegalStateException.class)
     public void testEmptyAssignedContainerThrowsIllegalStateException() throws InterruptedException, JsonProcessingException, IllegalAccessException {
-        String jsonString = "{\"k1\":\"v1\",\"k2\":\"v2\"}";
+        String jsonString = "{\"k1\":\"v1\",\"k2\":\"v2\", \"_lsn\":\"2\"}";
         ObjectMapper mapper = new ObjectMapper();
         JsonNode actualObj = mapper.readTree(jsonString);
         List<JsonNode> changes = new ArrayList<>();