Skip to content

Commit

Permalink
fix issue where mongoDBClient is not closed.
Browse files Browse the repository at this point in the history
Signed-off-by: Haidong <[email protected]>
  • Loading branch information
Haidong committed Nov 15, 2023
1 parent a3cb0e8 commit 7a899f3
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,53 +73,52 @@ private List<PartitionIdentifier> buildPartitions(final String collectionName) {
if (collection.size() < 2) {
throw new IllegalArgumentException("Invalid Collection Name. Must as db.collection format");
}
MongoClient mongoClient = MongoDBHelper.getMongoClient(mongoDBConfig);
MongoDatabase db = mongoClient.getDatabase(collection.get(0));
MongoCollection<Document> col = db.getCollection(collection.get(1));

int chunkSize = this.mongoDBConfig.getExportConfig().getItemsPerPartition();
FindIterable<Document> startIterable = col.find()
.projection(new Document("_id", 1))
.sort(new Document("_id", 1))
.limit(1);

while (true) {
try (MongoCursor<Document> startCursor = startIterable.iterator()) {
if (!startCursor.hasNext()) {
break;
}
Document startDoc = startCursor.next();
Object gteValue = startDoc.get("_id");
String className = gteValue.getClass().getName();

// Get end doc
Document endDoc = startIterable.skip(chunkSize - 1).limit(1).first();
if (endDoc == null) {
// this means we have reached the end of the doc
endDoc = col.find()
try (MongoClient mongoClient = MongoDBHelper.getMongoClient(mongoDBConfig)) {
MongoDatabase db = mongoClient.getDatabase(collection.get(0));
MongoCollection<Document> col = db.getCollection(collection.get(1));
int chunkSize = this.mongoDBConfig.getExportConfig().getItemsPerPartition();
FindIterable<Document> startIterable = col.find()
.projection(new Document("_id", 1))
.sort(new Document("_id", 1))
.limit(1);
while (true) {
try (MongoCursor<Document> startCursor = startIterable.iterator()) {
if (!startCursor.hasNext()) {
break;
}
Document startDoc = startCursor.next();
Object gteValue = startDoc.get("_id");
String className = gteValue.getClass().getName();

// Get end doc
Document endDoc = startIterable.skip(chunkSize - 1).limit(1).first();
if (endDoc == null) {
// this means we have reached the end of the doc
endDoc = col.find()
.projection(new Document("_id", 1))
.sort(new Document("_id", -1))
.limit(1)
.first();
}
if (endDoc == null) {
break;
}

Object lteValue = endDoc.get("_id");
LOG.info("Chunk of " + collectionName + ": {gte: " + gteValue.toString() + ", lte: " + lteValue.toString() + "}");
collectionPartitions.add(
PartitionIdentifier
.builder()
.withPartitionKey(String.format(DOCUMENTDB_PARTITION_KEY_FORMAT, collectionName, gteValue, lteValue, className))
.build());

startIterable = col.find(Filters.gt("_id", lteValue))
.projection(new Document("_id", 1))
.sort(new Document("_id", -1))
.limit(1)
.first();
.sort(new Document("_id", 1))
.limit(1);
} catch (Exception e) {
throw new RuntimeException(e);
}
if (endDoc == null) {
break;
}

Object lteValue = endDoc.get("_id");
LOG.info("Chunk of " + collectionName + ": {gte: " + gteValue.toString() + ", lte: " + lteValue.toString() + "}");
collectionPartitions.add(
PartitionIdentifier
.builder()
.withPartitionKey(String.format(DOCUMENTDB_PARTITION_KEY_FORMAT, collectionName, gteValue, lteValue, className))
.build());

startIterable = col.find(Filters.gt("_id", lteValue))
.projection(new Document("_id", 1))
.sort(new Document("_id", 1))
.limit(1);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
return collectionPartitions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,49 +137,48 @@ private void startProcessPartition(SourcePartition<MongoDBSnapshotProgressState>
if (collection.size() < 2) {
throw new RuntimeException("Invalid Collection Name. Must as db.collection format");
}
MongoClient mongoClient = MongoDBHelper.getMongoClient(mongoDBConfig);
MongoDatabase db = mongoClient.getDatabase(collection.get(0));
MongoCollection<Document> col = db.getCollection(collection.get(1));
Bson query = MongoDBHelper.buildAndQuery(gte, lte, className);
long totalRecords = 0L;
long successRecords = 0L;
long failedRecords = 0L;
try (MongoCursor<Document> cursor = col.find(query).iterator()) {
while (cursor.hasNext()) {
try {
JsonWriterSettings writerSettings = JsonWriterSettings.builder()
.outputMode(JsonMode.RELAXED)
.objectIdConverter((value, writer) -> writer.writeString(value.toHexString()))
.build();
String record = cursor.next().toJson(writerSettings);
Map<String, Object> data = convertToMap(record);
data.putIfAbsent(EVENT_SOURCE_DB_ATTRIBUTE, collection.get(0));
data.putIfAbsent(EVENT_SOURCE_COLLECTION_ATTRIBUTE, collection.get(1));
data.putIfAbsent(EVENT_SOURCE_OPERATION, OpenSearchBulkActions.CREATE.toString());
data.putIfAbsent(EVENT_SOURCE_TS_MS, 0);
if (buffer.isByteBuffer()) {
buffer.writeBytes(objectMapper.writeValueAsBytes(data), null, DEFAULT_BUFFER_WRITE_TIMEOUT_MS);
} else {
buffer.write(getEventFromData(data), DEFAULT_BUFFER_WRITE_TIMEOUT_MS);
}
successItemsCounter.increment();
successRecords += 1;
} catch (Exception e) {
LOG.error("failed to add record to buffer with error {}", e.getMessage());
failureItemsCounter.increment();
failedRecords += 1;
} finally {
try (MongoClient mongoClient = MongoDBHelper.getMongoClient(mongoDBConfig)) {
MongoDatabase db = mongoClient.getDatabase(collection.get(0));
MongoCollection<Document> col = db.getCollection(collection.get(1));
Bson query = MongoDBHelper.buildAndQuery(gte, lte, className);
long totalRecords = 0L;
long successRecords = 0L;
long failedRecords = 0L;
try (MongoCursor<Document> cursor = col.find(query).iterator()) {
while (cursor.hasNext()) {
totalRecords += 1;
try {
JsonWriterSettings writerSettings = JsonWriterSettings.builder()
.outputMode(JsonMode.RELAXED)
.objectIdConverter((value, writer) -> writer.writeString(value.toHexString()))
.build();
String record = cursor.next().toJson(writerSettings);
Map<String, Object> data = convertToMap(record);
data.putIfAbsent(EVENT_SOURCE_DB_ATTRIBUTE, collection.get(0));
data.putIfAbsent(EVENT_SOURCE_COLLECTION_ATTRIBUTE, collection.get(1));
data.putIfAbsent(EVENT_SOURCE_OPERATION, OpenSearchBulkActions.CREATE.toString());
data.putIfAbsent(EVENT_SOURCE_TS_MS, 0);
if (buffer.isByteBuffer()) {
buffer.writeBytes(objectMapper.writeValueAsBytes(data), null, DEFAULT_BUFFER_WRITE_TIMEOUT_MS);
} else {
buffer.write(getEventFromData(data), DEFAULT_BUFFER_WRITE_TIMEOUT_MS);
}
successItemsCounter.increment();
successRecords += 1;
} catch (Exception e) {
LOG.error("failed to add record to buffer with error {}", e.getMessage());
failureItemsCounter.increment();
failedRecords += 1;
}
}
final MongoDBSnapshotProgressState progressState = new MongoDBSnapshotProgressState();
progressState.setTotal(totalRecords);
progressState.setSuccess(successRecords);
progressState.setFailure(failedRecords);
sourceCoordinator.saveProgressStateForPartition(partition.getPartitionKey(), progressState);
} catch (Exception e) {
throw new RuntimeException(e);
}
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
final MongoDBSnapshotProgressState progressState = new MongoDBSnapshotProgressState();
progressState.setTotal(totalRecords);
progressState.setSuccess(successRecords);
progressState.setFailure(failedRecords);
sourceCoordinator.saveProgressStateForPartition(partition.getPartitionKey(), progressState);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -107,6 +108,7 @@ public void test_returnPartitionsForCollection() {
List<PartitionIdentifier> partitions = testSupplier.apply(globalStateMap);
// Then dependencies are called
verify(mongoClient).getDatabase(eq("test"));
verify(mongoClient, times(1)).close();
verify(mongoDatabase).getCollection(eq("collection"));
// And partitions are created
assertThat(partitions.size(), is(2));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator;
import org.opensearch.dataprepper.plugins.kafkaconnect.configuration.MongoDBConfig;

import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand All @@ -27,6 +28,7 @@
import static org.mockito.Mockito.mockConstruction;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
public class MongoDBServiceTest {
Expand All @@ -39,9 +41,6 @@ public class MongoDBServiceTest {
@Mock
private AcknowledgementSetManager acknowledgementSetManager;

@Mock
private ScheduledExecutorService scheduledExecutorService;

@Mock
private SourceCoordinator<MongoDBSnapshotProgressState> sourceCoordinator;

Expand All @@ -58,17 +57,18 @@ public void testConstructor() {
}

@Test
public void testStart() {
createObjectUnderTest().start();
verify(scheduledExecutorService).schedule(any(Runnable.class), eq(0L), eq(TimeUnit.MILLISECONDS));
public void testStartAndStop() throws InterruptedException {
when(sourceCoordinator.getNextPartition(mongoDBPartitionCreationSupplier)).thenReturn(Optional.empty());
MongoDBService testObject = createObjectUnderTest();
testObject.start();
Thread.sleep(100);
testObject.stop();
}

private MongoDBService createObjectUnderTest() {
try (final MockedStatic<Executors> executorsMockedStatic = mockStatic(Executors.class);
final MockedConstruction<MongoDBPartitionCreationSupplier> mockedConstruction = mockConstruction(MongoDBPartitionCreationSupplier.class, (mock, context) -> {
try (final MockedConstruction<MongoDBPartitionCreationSupplier> mockedConstruction = mockConstruction(MongoDBPartitionCreationSupplier.class, (mock, context) -> {
mongoDBPartitionCreationSupplier = mock;
})) {
executorsMockedStatic.when(Executors::newSingleThreadScheduledExecutor).thenReturn(scheduledExecutorService);
return MongoDBService.create(mongoDBConfig, sourceCoordinator, buffer, acknowledgementSetManager, pluginMetrics);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,6 @@ private void mockDependencyAndProcessPartition(String partitionKey, boolean shou
future.cancel(true);
assertThat(future.isCancelled(), equalTo(true));
assertThat(executorService.awaitTermination(1000, TimeUnit.MILLISECONDS), equalTo(true));

if (shouldProcessSucceed) {
// Verify Results
verify(cursor, times(2)).next();
Expand All @@ -238,6 +237,8 @@ private void mockDependencyAndProcessPartition(String partitionKey, boolean shou
assertThat(progressStates.get(0).getTotal(), is(2L));
assertThat(progressStates.get(0).getSuccess(), is(2L));
assertThat(progressStates.get(0).getFailed(), is(0L));

verify(mongoClient, times(1)).close();
}
}
}
Expand Down

0 comments on commit 7a899f3

Please sign in to comment.