diff --git a/CHANGELOG.md b/CHANGELOG.md
index 8f9f87bc..185b0bf6 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,5 +1,9 @@
## Release History
+### 1.14.1 (2024-02-29)
+#### Key Bug Fixes
+* Fixed `NullPointerException` in `CosmosDBSourceConnector`. [PR 555](https://github.com/microsoft/kafka-connect-cosmosdb/pull/555)
+
### 1.14.0 (2024-02-28)
#### New Features
* Updated `azure-cosmos` version to 4.56.0.
diff --git a/pom.xml b/pom.xml
index 61f90e50..78d12178 100644
--- a/pom.xml
+++ b/pom.xml
@@ -7,7 +7,7 @@
com.azure.cosmos.kafka
kafka-connect-cosmos
- 1.14.0
+ 1.14.1
kafka-connect-cosmos
https://github.com/microsoft/kafka-connect-cosmosdb
diff --git a/src/main/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTask.java b/src/main/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTask.java
index 0bc8d426..39e8f646 100644
--- a/src/main/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTask.java
+++ b/src/main/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTask.java
@@ -63,8 +63,9 @@ public String version() {
@Override
public void start(Map map) {
- logger.info("Worker {} Starting CosmosDBSourceTask.", this.config.getWorkerName());
- config = new CosmosDBSourceConfig(map);
+ logger.info("Starting CosmosDBSourceTask.");
+ config = new CosmosDBSourceConfig(map);
+
this.queue = new LinkedTransferQueue<>();
logger.info("Worker {} Creating the client.", this.config.getWorkerName());
diff --git a/src/test/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTaskTest.java b/src/test/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTaskTest.java
index dd5b0849..90020b72 100644
--- a/src/test/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTaskTest.java
+++ b/src/test/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTaskTest.java
@@ -9,6 +9,7 @@
import static org.mockito.Mockito.when;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -31,6 +32,8 @@
import org.mockito.Mockito;
public class CosmosDBSourceTaskTest {
+ private final static String COSMOS_EMULATOR_KEY = "C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==";
+ private final static String COSMOS_EMULATOR_HOST = "https://localhost:8081/";
private CosmosDBSourceTask testTask;
private final String topicName = "testtopic";
private final String containerName = "container666";
@@ -96,6 +99,17 @@ public void setup() throws IllegalAccessException {
}
+ @Test(expected = IllegalArgumentException.class)
+ public void start() {
+ CosmosDBSourceTask testTask = new CosmosDBSourceTask();
+ HashMap configs = new HashMap<>();
+ configs.put(CosmosDBSourceConfig.COSMOS_CONN_ENDPOINT_CONF, COSMOS_EMULATOR_HOST);
+ configs.put(CosmosDBSourceConfig.COSMOS_CONN_KEY_CONF, COSMOS_EMULATOR_KEY);
+ configs.put(CosmosDBSourceConfig.COSMOS_DATABASE_NAME_CONF, "mydb");
+ configs.put(CosmosDBSourceConfig.COSMOS_CONTAINER_TOPIC_MAP_CONF, "mytopic5#mycontainer6");
+ testTask.start(sourceSettings);
+ }
+
@Test
public void testHandleChanges() throws JsonProcessingException, IllegalAccessException, InterruptedException {
String jsonString = "{\"k1\":\"v1\",\"k2\":\"v2\", \"_lsn\":\"2\"}";