diff --git a/data-prepper-plugins/kafka-connect-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafkaconnect/source/mongoDB/MongoDBPartitionCreationSupplier.java b/data-prepper-plugins/kafka-connect-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafkaconnect/source/mongoDB/MongoDBPartitionCreationSupplier.java index fe6e62525d..3125059e1f 100644 --- a/data-prepper-plugins/kafka-connect-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafkaconnect/source/mongoDB/MongoDBPartitionCreationSupplier.java +++ b/data-prepper-plugins/kafka-connect-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafkaconnect/source/mongoDB/MongoDBPartitionCreationSupplier.java @@ -73,53 +73,52 @@ private List buildPartitions(final String collectionName) { if (collection.size() < 2) { throw new IllegalArgumentException("Invalid Collection Name. Must as db.collection format"); } - MongoClient mongoClient = MongoDBHelper.getMongoClient(mongoDBConfig); - MongoDatabase db = mongoClient.getDatabase(collection.get(0)); - MongoCollection col = db.getCollection(collection.get(1)); - - int chunkSize = this.mongoDBConfig.getExportConfig().getItemsPerPartition(); - FindIterable startIterable = col.find() - .projection(new Document("_id", 1)) - .sort(new Document("_id", 1)) - .limit(1); - - while (true) { - try (MongoCursor startCursor = startIterable.iterator()) { - if (!startCursor.hasNext()) { - break; - } - Document startDoc = startCursor.next(); - Object gteValue = startDoc.get("_id"); - String className = gteValue.getClass().getName(); - - // Get end doc - Document endDoc = startIterable.skip(chunkSize - 1).limit(1).first(); - if (endDoc == null) { - // this means we have reached the end of the doc - endDoc = col.find() + try (MongoClient mongoClient = MongoDBHelper.getMongoClient(mongoDBConfig)) { + MongoDatabase db = mongoClient.getDatabase(collection.get(0)); + MongoCollection col = db.getCollection(collection.get(1)); + int chunkSize = this.mongoDBConfig.getExportConfig().getItemsPerPartition(); + FindIterable startIterable = col.find() + .projection(new Document("_id", 1)) + .sort(new Document("_id", 1)) + .limit(1); + while (true) { + try (MongoCursor startCursor = startIterable.iterator()) { + if (!startCursor.hasNext()) { + break; + } + Document startDoc = startCursor.next(); + Object gteValue = startDoc.get("_id"); + String className = gteValue.getClass().getName(); + + // Get end doc + Document endDoc = startIterable.skip(chunkSize - 1).limit(1).first(); + if (endDoc == null) { + // this means we have reached the end of the doc + endDoc = col.find() + .projection(new Document("_id", 1)) + .sort(new Document("_id", -1)) + .limit(1) + .first(); + } + if (endDoc == null) { + break; + } + + Object lteValue = endDoc.get("_id"); + LOG.info("Chunk of " + collectionName + ": {gte: " + gteValue.toString() + ", lte: " + lteValue.toString() + "}"); + collectionPartitions.add( + PartitionIdentifier + .builder() + .withPartitionKey(String.format(DOCUMENTDB_PARTITION_KEY_FORMAT, collectionName, gteValue, lteValue, className)) + .build()); + + startIterable = col.find(Filters.gt("_id", lteValue)) .projection(new Document("_id", 1)) - .sort(new Document("_id", -1)) - .limit(1) - .first(); + .sort(new Document("_id", 1)) + .limit(1); + } catch (Exception e) { + throw new RuntimeException(e); } - if (endDoc == null) { - break; - } - - Object lteValue = endDoc.get("_id"); - LOG.info("Chunk of " + collectionName + ": {gte: " + gteValue.toString() + ", lte: " + lteValue.toString() + "}"); - collectionPartitions.add( - PartitionIdentifier - .builder() - .withPartitionKey(String.format(DOCUMENTDB_PARTITION_KEY_FORMAT, collectionName, gteValue, lteValue, className)) - .build()); - - startIterable = col.find(Filters.gt("_id", lteValue)) - .projection(new Document("_id", 1)) - .sort(new Document("_id", 1)) - .limit(1); - } catch (Exception e) { - throw new RuntimeException(e); } } return collectionPartitions; diff --git a/data-prepper-plugins/kafka-connect-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafkaconnect/source/mongoDB/MongoDBSnapshotWorker.java b/data-prepper-plugins/kafka-connect-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafkaconnect/source/mongoDB/MongoDBSnapshotWorker.java index b28fc21372..108aab0df0 100644 --- a/data-prepper-plugins/kafka-connect-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafkaconnect/source/mongoDB/MongoDBSnapshotWorker.java +++ b/data-prepper-plugins/kafka-connect-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafkaconnect/source/mongoDB/MongoDBSnapshotWorker.java @@ -137,49 +137,48 @@ private void startProcessPartition(SourcePartition if (collection.size() < 2) { throw new RuntimeException("Invalid Collection Name. Must as db.collection format"); } - MongoClient mongoClient = MongoDBHelper.getMongoClient(mongoDBConfig); - MongoDatabase db = mongoClient.getDatabase(collection.get(0)); - MongoCollection col = db.getCollection(collection.get(1)); - Bson query = MongoDBHelper.buildAndQuery(gte, lte, className); - long totalRecords = 0L; - long successRecords = 0L; - long failedRecords = 0L; - try (MongoCursor cursor = col.find(query).iterator()) { - while (cursor.hasNext()) { - try { - JsonWriterSettings writerSettings = JsonWriterSettings.builder() - .outputMode(JsonMode.RELAXED) - .objectIdConverter((value, writer) -> writer.writeString(value.toHexString())) - .build(); - String record = cursor.next().toJson(writerSettings); - Map data = convertToMap(record); - data.putIfAbsent(EVENT_SOURCE_DB_ATTRIBUTE, collection.get(0)); - data.putIfAbsent(EVENT_SOURCE_COLLECTION_ATTRIBUTE, collection.get(1)); - data.putIfAbsent(EVENT_SOURCE_OPERATION, OpenSearchBulkActions.CREATE.toString()); - data.putIfAbsent(EVENT_SOURCE_TS_MS, 0); - if (buffer.isByteBuffer()) { - buffer.writeBytes(objectMapper.writeValueAsBytes(data), null, DEFAULT_BUFFER_WRITE_TIMEOUT_MS); - } else { - buffer.write(getEventFromData(data), DEFAULT_BUFFER_WRITE_TIMEOUT_MS); - } - successItemsCounter.increment(); - successRecords += 1; - } catch (Exception e) { - LOG.error("failed to add record to buffer with error {}", e.getMessage()); - failureItemsCounter.increment(); - failedRecords += 1; - } finally { + try (MongoClient mongoClient = MongoDBHelper.getMongoClient(mongoDBConfig)) { + MongoDatabase db = mongoClient.getDatabase(collection.get(0)); + MongoCollection col = db.getCollection(collection.get(1)); + Bson query = MongoDBHelper.buildAndQuery(gte, lte, className); + long totalRecords = 0L; + long successRecords = 0L; + long failedRecords = 0L; + try (MongoCursor cursor = col.find(query).iterator()) { + while (cursor.hasNext()) { totalRecords += 1; + try { + JsonWriterSettings writerSettings = JsonWriterSettings.builder() + .outputMode(JsonMode.RELAXED) + .objectIdConverter((value, writer) -> writer.writeString(value.toHexString())) + .build(); + String record = cursor.next().toJson(writerSettings); + Map data = convertToMap(record); + data.putIfAbsent(EVENT_SOURCE_DB_ATTRIBUTE, collection.get(0)); + data.putIfAbsent(EVENT_SOURCE_COLLECTION_ATTRIBUTE, collection.get(1)); + data.putIfAbsent(EVENT_SOURCE_OPERATION, OpenSearchBulkActions.CREATE.toString()); + data.putIfAbsent(EVENT_SOURCE_TS_MS, 0); + if (buffer.isByteBuffer()) { + buffer.writeBytes(objectMapper.writeValueAsBytes(data), null, DEFAULT_BUFFER_WRITE_TIMEOUT_MS); + } else { + buffer.write(getEventFromData(data), DEFAULT_BUFFER_WRITE_TIMEOUT_MS); + } + successItemsCounter.increment(); + successRecords += 1; + } catch (Exception e) { + LOG.error("failed to add record to buffer with error {}", e.getMessage()); + failureItemsCounter.increment(); + failedRecords += 1; + } } + final MongoDBSnapshotProgressState progressState = new MongoDBSnapshotProgressState(); + progressState.setTotal(totalRecords); + progressState.setSuccess(successRecords); + progressState.setFailure(failedRecords); + sourceCoordinator.saveProgressStateForPartition(partition.getPartitionKey(), progressState); + } catch (Exception e) { + throw new RuntimeException(e); } - } catch (Exception e) { - throw new RuntimeException(e); - } finally { - final MongoDBSnapshotProgressState progressState = new MongoDBSnapshotProgressState(); - progressState.setTotal(totalRecords); - progressState.setSuccess(successRecords); - progressState.setFailure(failedRecords); - sourceCoordinator.saveProgressStateForPartition(partition.getPartitionKey(), progressState); } } diff --git a/data-prepper-plugins/kafka-connect-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafkaconnect/source/mongoDB/MongoDBPartitionCreationSupplierTest.java b/data-prepper-plugins/kafka-connect-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafkaconnect/source/mongoDB/MongoDBPartitionCreationSupplierTest.java index c11256a516..c5133acf6a 100644 --- a/data-prepper-plugins/kafka-connect-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafkaconnect/source/mongoDB/MongoDBPartitionCreationSupplierTest.java +++ b/data-prepper-plugins/kafka-connect-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafkaconnect/source/mongoDB/MongoDBPartitionCreationSupplierTest.java @@ -39,6 +39,7 @@ import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -107,6 +108,7 @@ public void test_returnPartitionsForCollection() { List partitions = testSupplier.apply(globalStateMap); // Then dependencies are called verify(mongoClient).getDatabase(eq("test")); + verify(mongoClient, times(1)).close(); verify(mongoDatabase).getCollection(eq("collection")); // And partitions are created assertThat(partitions.size(), is(2)); diff --git a/data-prepper-plugins/kafka-connect-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafkaconnect/source/mongoDB/MongoDBServiceTest.java b/data-prepper-plugins/kafka-connect-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafkaconnect/source/mongoDB/MongoDBServiceTest.java index b5a6aedb6c..c63cbfcc9c 100644 --- a/data-prepper-plugins/kafka-connect-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafkaconnect/source/mongoDB/MongoDBServiceTest.java +++ b/data-prepper-plugins/kafka-connect-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafkaconnect/source/mongoDB/MongoDBServiceTest.java @@ -18,6 +18,7 @@ import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator; import org.opensearch.dataprepper.plugins.kafkaconnect.configuration.MongoDBConfig; +import java.util.Optional; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -27,6 +28,7 @@ import static org.mockito.Mockito.mockConstruction; import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) public class MongoDBServiceTest { @@ -39,9 +41,6 @@ public class MongoDBServiceTest { @Mock private AcknowledgementSetManager acknowledgementSetManager; - @Mock - private ScheduledExecutorService scheduledExecutorService; - @Mock private SourceCoordinator sourceCoordinator; @@ -58,17 +57,18 @@ public void testConstructor() { } @Test - public void testStart() { - createObjectUnderTest().start(); - verify(scheduledExecutorService).schedule(any(Runnable.class), eq(0L), eq(TimeUnit.MILLISECONDS)); + public void testStartAndStop() throws InterruptedException { + when(sourceCoordinator.getNextPartition(mongoDBPartitionCreationSupplier)).thenReturn(Optional.empty()); + MongoDBService testObject = createObjectUnderTest(); + testObject.start(); + Thread.sleep(100); + testObject.stop(); } private MongoDBService createObjectUnderTest() { - try (final MockedStatic executorsMockedStatic = mockStatic(Executors.class); - final MockedConstruction mockedConstruction = mockConstruction(MongoDBPartitionCreationSupplier.class, (mock, context) -> { + try (final MockedConstruction mockedConstruction = mockConstruction(MongoDBPartitionCreationSupplier.class, (mock, context) -> { mongoDBPartitionCreationSupplier = mock; })) { - executorsMockedStatic.when(Executors::newSingleThreadScheduledExecutor).thenReturn(scheduledExecutorService); return MongoDBService.create(mongoDBConfig, sourceCoordinator, buffer, acknowledgementSetManager, pluginMetrics); } } diff --git a/data-prepper-plugins/kafka-connect-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafkaconnect/source/mongoDB/MongoDBSnapshotWorkerTest.java b/data-prepper-plugins/kafka-connect-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafkaconnect/source/mongoDB/MongoDBSnapshotWorkerTest.java index 00c33d374b..ef4179d526 100644 --- a/data-prepper-plugins/kafka-connect-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafkaconnect/source/mongoDB/MongoDBSnapshotWorkerTest.java +++ b/data-prepper-plugins/kafka-connect-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafkaconnect/source/mongoDB/MongoDBSnapshotWorkerTest.java @@ -227,7 +227,6 @@ private void mockDependencyAndProcessPartition(String partitionKey, boolean shou future.cancel(true); assertThat(future.isCancelled(), equalTo(true)); assertThat(executorService.awaitTermination(1000, TimeUnit.MILLISECONDS), equalTo(true)); - if (shouldProcessSucceed) { // Verify Results verify(cursor, times(2)).next(); @@ -238,6 +237,8 @@ private void mockDependencyAndProcessPartition(String partitionKey, boolean shou assertThat(progressStates.get(0).getTotal(), is(2L)); assertThat(progressStates.get(0).getSuccess(), is(2L)); assertThat(progressStates.get(0).getFailed(), is(0L)); + + verify(mongoClient, times(1)).close(); } } }