diff --git a/data-prepper-plugins/kafka-connect-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafkaconnect/source/mongoDB/MongoDBHelper.java b/data-prepper-plugins/kafka-connect-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafkaconnect/source/mongoDB/MongoDBHelper.java index 150a5e0d41..ab8be30427 100644 --- a/data-prepper-plugins/kafka-connect-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafkaconnect/source/mongoDB/MongoDBHelper.java +++ b/data-prepper-plugins/kafka-connect-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafkaconnect/source/mongoDB/MongoDBHelper.java @@ -7,8 +7,14 @@ import com.mongodb.client.MongoClient; import com.mongodb.client.MongoClients; +import org.bson.conversions.Bson; +import org.bson.types.ObjectId; import org.opensearch.dataprepper.plugins.kafkaconnect.configuration.MongoDBConfig; +import static com.mongodb.client.model.Filters.and; +import static com.mongodb.client.model.Filters.gte; +import static com.mongodb.client.model.Filters.lte; + public class MongoDBHelper { public static MongoClient getMongoClient(final MongoDBConfig mongoDBConfig) { @@ -24,4 +30,36 @@ public static MongoClient getMongoClient(final MongoDBConfig mongoDBConfig) { return MongoClients.create(connectionString); } + + public static Bson buildQuery(String gte, String lte, String className) { + switch (className) { + case "java.lang.Integer": + return and( + gte("_id", Integer.parseInt(gte)), + lte("_id", Integer.parseInt(lte)) + ); + case "java.lang.Double": + return and( + gte("_id", Double.parseDouble(gte)), + lte("_id", Double.parseDouble(lte)) + ); + case "java.lang.String": + return and( + gte("_id", gte), + lte("_id", lte) + ); + case "java.lang.Long": + return and( + gte("_id", Long.parseLong(gte)), + lte("_id", Long.parseLong(lte)) + ); + case "org.bson.types.ObjectId": + return and( + gte("_id", new ObjectId(gte)), + lte("_id", new ObjectId(lte)) + ); + default: + throw new RuntimeException("Unexpected _id class supported: " + className); + } + } } diff --git a/data-prepper-plugins/kafka-connect-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafkaconnect/source/mongoDB/MongoDBSnapshotWorker.java b/data-prepper-plugins/kafka-connect-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafkaconnect/source/mongoDB/MongoDBSnapshotWorker.java index 348248b99b..fb9f531580 100644 --- a/data-prepper-plugins/kafka-connect-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafkaconnect/source/mongoDB/MongoDBSnapshotWorker.java +++ b/data-prepper-plugins/kafka-connect-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafkaconnect/source/mongoDB/MongoDBSnapshotWorker.java @@ -15,7 +15,6 @@ import io.micrometer.core.instrument.Counter; import org.bson.Document; import org.bson.conversions.Bson; -import org.bson.types.ObjectId; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; @@ -35,16 +34,13 @@ import java.util.Map; import java.util.Optional; -import static com.mongodb.client.model.Filters.and; -import static com.mongodb.client.model.Filters.gte; -import static com.mongodb.client.model.Filters.lte; - public class MongoDBSnapshotWorker implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(MongoDBSnapshotWorker.class); private static final Duration BACKOFF_ON_EXCEPTION = Duration.ofSeconds(60); private static final Duration BACKOFF_ON_EMPTY_PARTITION = Duration.ofSeconds(60); private static final Duration ACKNOWLEDGEMENT_SET_TIMEOUT = Duration.ofHours(2); private static final String SUCCESS_ITEM_COUNTER_NAME = "exportRecordsSuccessTotal"; + private static final String FAILURE_ITEM_COUNTER_NAME = "exportRecordsFailedTotal"; private static final String SUCCESS_PARTITION_COUNTER_NAME = "exportPartitionSuccessTotal"; private static final String FAILURE_PARTITION_COUNTER_NAME = "exportPartitionFailureTotal"; private static final String EVENT_SOURCE_COLLECTION_ATTRIBUTE = "__collection"; @@ -58,10 +54,12 @@ public class MongoDBSnapshotWorker implements Runnable { private final AcknowledgementSetManager acknowledgementSetManager; private final MongoDBConfig mongoDBConfig; private final Counter successItemsCounter; + private final Counter failureItemsCounter; private final Counter successPartitionCounter; private final Counter failureParitionCounter; private final ObjectMapper objectMapper = new ObjectMapper(); - private final TypeReference> mapTypeReference = new TypeReference>() {}; + private final TypeReference> mapTypeReference = new TypeReference>() { + }; public MongoDBSnapshotWorker(final SourceCoordinator sourceCoordinator, @@ -76,6 +74,7 @@ public MongoDBSnapshotWorker(final SourceCoordinator MongoClient mongoClient = MongoDBHelper.getMongoClient(mongoDBConfig); MongoDatabase db = mongoClient.getDatabase(collection.get(0)); MongoCollection col = db.getCollection(collection.get(1)); - Bson query = this.buildQuery(gte, lte, className); + Bson query = MongoDBHelper.buildQuery(gte, lte, className); MongoCursor cursor = col.find(query).iterator(); int totalRecords = 0; try { while (cursor.hasNext()) { - String record = cursor.next().toJson(); - Map data = convertToMap(record); - data.putIfAbsent(EVENT_SOURCE_DB_ATTRIBUTE, collection.get(0)); - data.putIfAbsent(EVENT_SOURCE_COLLECTION_ATTRIBUTE, collection.get(1)); - data.putIfAbsent(EVENT_SOURCE_OPERATION, OpenSearchBulkActions.CREATE.toString()); - if (buffer.isByteBuffer()) { - buffer.writeBytes(objectMapper.writeValueAsBytes(data), null, DEFAULT_BUFFER_WRITE_TIMEOUT_MS); - } else { - buffer.write(getEventFromData(data), DEFAULT_BUFFER_WRITE_TIMEOUT_MS); + try { + Map data = convertToMap(record); + data.putIfAbsent(EVENT_SOURCE_DB_ATTRIBUTE, collection.get(0)); + data.putIfAbsent(EVENT_SOURCE_COLLECTION_ATTRIBUTE, collection.get(1)); + data.putIfAbsent(EVENT_SOURCE_OPERATION, OpenSearchBulkActions.CREATE.toString()); + if (buffer.isByteBuffer()) { + buffer.writeBytes(objectMapper.writeValueAsBytes(data), null, DEFAULT_BUFFER_WRITE_TIMEOUT_MS); + } else { + buffer.write(getEventFromData(data), DEFAULT_BUFFER_WRITE_TIMEOUT_MS); + } + } catch (JsonProcessingException e) { + LOG.error("failed to add record to buffer with error {}", e.getMessage()); + failureItemsCounter.increment(); + continue; } successItemsCounter.increment(); totalRecords += 1; @@ -166,39 +171,6 @@ private void startProcessPartition(SourcePartition sourceCoordinator.saveProgressStateForPartition(partition.getPartitionKey(), progressState); } - private Bson buildQuery(String gte, String lte, String className) { - switch (className) { - case "java.lang.Integer": - return and( - gte("_id", Integer.parseInt(gte)), - lte("_id", Integer.parseInt(lte)) - ); - case "java.lang.Double": - return and( - gte("_id", Double.parseDouble(gte)), - lte("_id", Double.parseDouble(lte)) - ); - case "java.lang.String": - return and( - gte("_id", gte), - lte("_id", lte) - ); - case "java.lang.Long": - return and( - gte("_id", Long.parseLong(gte)), - lte("_id", Long.parseLong(lte)) - ); - case "org.bson.types.ObjectId": - return and( - gte("_id", new ObjectId(gte)), - lte("_id", new ObjectId(lte)) - ); - default: - throw new RuntimeException("Unexpected _id class supported: " + className); - } - - } - private Optional createAcknowledgementSet(SourcePartition partition) { if (mongoDBConfig.getExportConfig().getAcknowledgements()) { return Optional.of(this.acknowledgementSetManager.create((result) -> { @@ -210,12 +182,8 @@ private Optional createAcknowledgementSet(SourcePartition convertToMap(String jsonData) { - try { - return objectMapper.readValue(jsonData, mapTypeReference); - } catch (JsonProcessingException e) { - return null; - } + private Map convertToMap(String jsonData) throws JsonProcessingException { + return objectMapper.readValue(jsonData, mapTypeReference); } private Record getEventFromData(Map data) { diff --git a/data-prepper-plugins/kafka-connect-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafkaconnect/source/mongoDB/MongoDBPartitionCreationSupplierTest.java b/data-prepper-plugins/kafka-connect-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafkaconnect/source/mongoDB/MongoDBPartitionCreationSupplierTest.java index bd42460292..4a47464da9 100644 --- a/data-prepper-plugins/kafka-connect-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafkaconnect/source/mongoDB/MongoDBPartitionCreationSupplierTest.java +++ b/data-prepper-plugins/kafka-connect-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafkaconnect/source/mongoDB/MongoDBPartitionCreationSupplierTest.java @@ -58,7 +58,7 @@ public void setup() { collectionConfig = mock(MongoDBConfig.CollectionConfig.class); lenient().when(collectionConfig.getCollectionName()).thenReturn(TEST_COLLECTION_NAME); lenient().when(mongoDBConfig.getCollections()).thenReturn(Collections.singletonList(collectionConfig)); - lenient().when(mongoDBConfig.getCredentialsConfig()).thenReturn(new CredentialsConfig(new CredentialsConfig.PlainText("", ""), null)); + lenient().when(mongoDBConfig.getCredentialsConfig()).thenReturn(new CredentialsConfig(new CredentialsConfig.PlainText("user", "user"), null)); lenient().when(mongoDBConfig.getExportConfig()).thenReturn(new MongoDBConfig.ExportConfig()); testSupplier = new MongoDBPartitionCreationSupplier(mongoDBConfig); } diff --git a/data-prepper-plugins/kafka-connect-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafkaconnect/source/mongoDB/MongoDBSnapshotWorkerTest.java b/data-prepper-plugins/kafka-connect-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafkaconnect/source/mongoDB/MongoDBSnapshotWorkerTest.java new file mode 100644 index 0000000000..02bfa573e9 --- /dev/null +++ b/data-prepper-plugins/kafka-connect-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafkaconnect/source/mongoDB/MongoDBSnapshotWorkerTest.java @@ -0,0 +1,230 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.kafkaconnect.source.mongoDB; + +import com.mongodb.client.FindIterable; +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoClients; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoCursor; +import com.mongodb.client.MongoDatabase; +import io.micrometer.core.instrument.Counter; +import org.bson.Document; +import org.bson.conversions.Bson; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockedStatic; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator; +import org.opensearch.dataprepper.model.source.coordinator.SourcePartition; +import org.opensearch.dataprepper.plugins.kafkaconnect.configuration.CredentialsConfig; +import org.opensearch.dataprepper.plugins.kafkaconnect.configuration.MongoDBConfig; + +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Consumer; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class MongoDBSnapshotWorkerTest { + @Mock + private SourceCoordinator sourceCoordinator; + @Mock + private Buffer> buffer; + @Mock + private MongoDBPartitionCreationSupplier mongoDBPartitionCreationSupplier; + @Mock + private PluginMetrics pluginMetrics; + @Mock + private AcknowledgementSetManager acknowledgementSetManager; + @Mock + private MongoDBConfig mongoDBConfig; + @Mock + private SourcePartition sourcePartition; + @Mock + private Counter counter; + private MongoDBSnapshotWorker testWorker; + private ExecutorService executorService; + + + @BeforeEach + public void setup() throws TimeoutException { + lenient().when(mongoDBConfig.getExportConfig()).thenReturn(new MongoDBConfig.ExportConfig()); + lenient().when(mongoDBConfig.getCredentialsConfig()).thenReturn(new CredentialsConfig(new CredentialsConfig.PlainText("user", "user"), null)); + lenient().when(buffer.isByteBuffer()).thenReturn(false); + lenient().doNothing().when(buffer).write(any(), anyInt()); + lenient().doNothing().when(sourceCoordinator).saveProgressStateForPartition(anyString(), any()); + lenient().when(pluginMetrics.counter(anyString())).thenReturn(counter); + executorService = Executors.newSingleThreadExecutor(); + testWorker = new MongoDBSnapshotWorker(sourceCoordinator, buffer, mongoDBPartitionCreationSupplier, pluginMetrics, acknowledgementSetManager, mongoDBConfig); + } + + @Test + public void test_shouldSleepIfNoPartitionRetrieved() throws InterruptedException { + when(sourceCoordinator.getNextPartition(mongoDBPartitionCreationSupplier)).thenReturn(Optional.empty()); + final Future future = executorService.submit(() -> testWorker.run()); + Thread.sleep(100); + executorService.shutdown(); + future.cancel(true); + assertThat(future.isCancelled(), equalTo(true)); + assertThat(executorService.awaitTermination(100, TimeUnit.MILLISECONDS), equalTo(true)); + } + + @ParameterizedTest + @CsvSource({ + "test.collection|0|1|java.lang.Integer", + "test.collection|0|1|java.lang.Double", + "test.collection|0|1|java.lang.String", + "test.collection|0|1|java.lang.Long", + "test.collection|000000000000000000000000|000000000000000000000001|org.bson.types.ObjectId" + }) + public void test_shouldProcessPartitionSuccess(final String partitionKey) throws InterruptedException, TimeoutException { + this.mockDependencyAndProcessPartition(partitionKey, true); + + final ArgumentCaptor> ingestDataCapture = ArgumentCaptor.forClass(Record.class); + verify(buffer, times(2)).write(ingestDataCapture.capture(), anyInt()); + List> capturedData = ingestDataCapture.getAllValues(); + String data1 = ((Event) capturedData.get(0).getData()).jsonBuilder().includeTags(null).toJsonString(); + String data2 = ((Event) capturedData.get(1).getData()).jsonBuilder().includeTags(null).toJsonString(); + assertThat(data1, is("{\"_id\":0,\"__source_db\":\"test\",\"__collection\":\"collection\",\"__op\":\"create\"}")); + assertThat(data2, is("{\"_id\":1,\"__source_db\":\"test\",\"__collection\":\"collection\",\"__op\":\"create\"}")); + } + + @Test + public void test_shouldProcessPartitionSuccess_byteBuffer() throws Exception { + when(buffer.isByteBuffer()).thenReturn(true); + doNothing().when(buffer).writeBytes(any(byte[].class), any(), anyInt()); + this.mockDependencyAndProcessPartition("test.collection|0|1|java.lang.Integer", true); + + final ArgumentCaptor ingestDataCapture = ArgumentCaptor.forClass(byte[].class); + verify(buffer, times(2)).writeBytes(ingestDataCapture.capture(), any(), anyInt()); + List capturedData = ingestDataCapture.getAllValues(); + String data1 = new String(capturedData.get(0)); + String data2 = new String(capturedData.get(1)); + assertThat(data1, is("{\"_id\":0,\"__source_db\":\"test\",\"__collection\":\"collection\",\"__op\":\"create\"}")); + assertThat(data2, is("{\"_id\":1,\"__source_db\":\"test\",\"__collection\":\"collection\",\"__op\":\"create\"}")); + } + + @Test + public void test_shouldProcessPartitionSuccess_ackEnabled() throws InterruptedException, TimeoutException { + MongoDBConfig.ExportConfig exportConfig = mock(MongoDBConfig.ExportConfig.class); + when(exportConfig.getAcknowledgements()).thenReturn(true); + when(mongoDBConfig.getExportConfig()).thenReturn(exportConfig); + final AcknowledgementSet acknowledgementSet = mock(AcknowledgementSet.class); + doAnswer(invocation -> { + Consumer consumer = invocation.getArgument(0); + consumer.accept(true); + return acknowledgementSet; + }).when(acknowledgementSetManager).create(any(Consumer.class), any()); + doNothing().when(sourceCoordinator).updatePartitionForAcknowledgmentWait(anyString(), any()); + + this.mockDependencyAndProcessPartition("test.collection|0|1|java.lang.Integer", true); + + final ArgumentCaptor> ingestDataCapture = ArgumentCaptor.forClass(Record.class); + verify(buffer, times(2)).write(ingestDataCapture.capture(), anyInt()); + List> capturedData = ingestDataCapture.getAllValues(); + String data1 = ((Event) capturedData.get(0).getData()).jsonBuilder().includeTags(null).toJsonString(); + String data2 = ((Event) capturedData.get(1).getData()).jsonBuilder().includeTags(null).toJsonString(); + assertThat(data1, is("{\"_id\":0,\"__source_db\":\"test\",\"__collection\":\"collection\",\"__op\":\"create\"}")); + assertThat(data2, is("{\"_id\":1,\"__source_db\":\"test\",\"__collection\":\"collection\",\"__op\":\"create\"}")); + } + + @Test + public void test_shouldGiveUpPartitionIfExceptionOccurred() throws InterruptedException { + doNothing().when(sourceCoordinator).giveUpPartitions(); + this.mockDependencyAndProcessPartition("invalidPartition", false); + verify(sourceCoordinator, times(1)).giveUpPartitions(); + } + + @Test + public void test_shouldThreadSleepIfExceptionOccurred() throws InterruptedException { + doThrow(new RuntimeException("")).when(sourceCoordinator).getNextPartition(mongoDBPartitionCreationSupplier); + final Future future = executorService.submit(() -> testWorker.run()); + Thread.sleep(100); + executorService.shutdown(); + future.cancel(true); + assertThat(future.isCancelled(), equalTo(true)); + assertThat(executorService.awaitTermination(100, TimeUnit.MILLISECONDS), equalTo(true)); + } + + private void mockDependencyAndProcessPartition(String partitionKey, boolean shouldProcessSucceed) throws InterruptedException { + lenient().when(sourcePartition.getPartitionKey()).thenReturn(partitionKey); + lenient().doNothing().when(sourceCoordinator).completePartition(anyString(), anyBoolean()); + lenient().when(sourceCoordinator.getNextPartition(mongoDBPartitionCreationSupplier)) + .thenReturn(Optional.of(sourcePartition)) + .thenReturn(Optional.empty()); + + MongoClient mongoClient = mock(MongoClient.class); + MongoDatabase mongoDatabase = mock(MongoDatabase.class); + MongoCollection col = mock(MongoCollection.class); + FindIterable findIterable = mock(FindIterable.class); + MongoCursor cursor = mock(MongoCursor.class); + lenient().when(mongoClient.getDatabase(anyString())).thenReturn(mongoDatabase); + lenient().when(mongoDatabase.getCollection(anyString())).thenReturn(col); + lenient().when(col.find(any(Bson.class))).thenReturn(findIterable); + lenient().when(findIterable.iterator()).thenReturn(cursor); + lenient().when(cursor.hasNext()).thenReturn(true, true, false); + lenient().when(cursor.next()) + .thenReturn(new Document("_id", 0)) + .thenReturn(new Document("_id", 1)); + + final Future future = executorService.submit(() -> { + try (MockedStatic mockedMongoClientsStatic = mockStatic(MongoClients.class)) { + mockedMongoClientsStatic.when(() -> MongoClients.create(anyString())).thenReturn(mongoClient); + testWorker.run(); + } + }); + Thread.sleep(1000); + executorService.shutdown(); + future.cancel(true); + assertThat(future.isCancelled(), equalTo(true)); + assertThat(executorService.awaitTermination(1000, TimeUnit.MILLISECONDS), equalTo(true)); + + if (shouldProcessSucceed) { + // Verify Results + verify(cursor, times(2)).next(); + + final ArgumentCaptor progressStateCapture = ArgumentCaptor.forClass(MongoDBSnapshotProgressState.class); + verify(sourceCoordinator, times(1)).saveProgressStateForPartition(eq(partitionKey), progressStateCapture.capture()); + List progressStates = progressStateCapture.getAllValues(); + assertThat(progressStates.get(0).getTotal(), is(2)); + } + } +} +