Skip to content

Commit

Permalink
fix bug when create partitions.
Browse files Browse the repository at this point in the history
Signed-off-by: Haidong <[email protected]>
  • Loading branch information
Haidong committed Nov 13, 2023
1 parent 6356547 commit c600cf4
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -125,20 +125,20 @@ public String getTopicPrefix() {
}

public static class ExportConfig {
private static long DEFAULT_ITEMS_PER_PARTITION = 4000;
private static int DEFAULT_ITEMS_PER_PARTITION = 4000;
private static String DEFAULT_READ_PREFERENCE = "secondaryPreferred";
@JsonProperty("acknowledgments")
private boolean acknowledgments = false;
private Boolean acknowledgments = false;
@JsonProperty("items_per_partition")
private long itemsPerPartition = DEFAULT_ITEMS_PER_PARTITION;
private Integer itemsPerPartition = DEFAULT_ITEMS_PER_PARTITION;
@JsonProperty("read_preference")
private String readPreference = DEFAULT_READ_PREFERENCE;

public boolean getAcknowledgements() {
return this.acknowledgments;
}

public long getItemsPerPartition() {
public Integer getItemsPerPartition() {
return this.itemsPerPartition;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public static MongoClient getMongoClient(final MongoDBConfig mongoDBConfig) {
return MongoClients.create(connectionString);
}

public static Bson buildQuery(String gte, String lte, String className) {
public static Bson buildAndQuery(String gte, String lte, String className) {
switch (className) {
case "java.lang.Integer":
return and(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@

package org.opensearch.dataprepper.plugins.kafkaconnect.source.mongoDB;

import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;
import org.bson.Document;
import org.opensearch.dataprepper.model.source.coordinator.PartitionIdentifier;
import org.opensearch.dataprepper.plugins.kafkaconnect.configuration.MongoDBConfig;
Expand All @@ -24,7 +26,7 @@
import java.util.function.Function;
import java.util.stream.Collectors;

public class MongoDBPartitionCreationSupplier implements Function<Map<String, Object>, List<PartitionIdentifier>> {
public class MongoDBPartitionCreationSupplier implements Function<Map<String, Object>, List<PartitionIdentifier>> {
public static final String GLOBAL_STATE_PARTITIONED_COLLECTION_KEY = "partitionedCollections";
private static final Logger LOG = LoggerFactory.getLogger(MongoDBPartitionCreationSupplier.class);
private static final String DOCUMENTDB_PARTITION_KEY_FORMAT = "%s|%s|%s|%s"; // partition format: <db.collection>|<gte>|<lt>|<className>
Expand Down Expand Up @@ -75,52 +77,50 @@ private List<PartitionIdentifier> buildPartitions(final String collectionName) {
MongoDatabase db = mongoClient.getDatabase(collection.get(0));
MongoCollection<Document> col = db.getCollection(collection.get(1));

long totalCount = col.countDocuments();
long chunkSize = this.mongoDBConfig.getExportConfig().getItemsPerPartition();
long startIndex = 0;
long endIndex = startIndex + chunkSize - 1;

while (startIndex < totalCount) {
MongoCursor<Document> cursor = col.find()
.projection(new Document("_id", 1))
.sort(new Document("_id", 1))
.skip((int) startIndex)
.limit(1)
.iterator();

Document firstDoc = cursor.hasNext() ? cursor.next() : null;

// Get second doc
MongoCursor<Document> secondCursor = col.find()
.projection(new Document("_id", 1))
.sort(new Document("_id", 1))
.skip((int)endIndex)
.limit(1)
.iterator();
if (!secondCursor.hasNext()) {
// this means we have reached the end of the doc
secondCursor = col.find()
.projection(new Document("_id", 1))
.sort(new Document("_id", -1))
.limit(1)
.iterator();
}
Document secondDoc = secondCursor.hasNext() ? secondCursor.next() : null;

if (firstDoc != null && secondDoc != null) {
Object gteValue = firstDoc.get("_id");
Object lteValue = secondDoc.get("_id");
int chunkSize = this.mongoDBConfig.getExportConfig().getItemsPerPartition();
FindIterable<Document> startIterable = col.find()
.projection(new Document("_id", 1))
.sort(new Document("_id", 1))
.limit(1);

while (true) {
try (MongoCursor<Document> startCursor = startIterable.iterator()) {
if (!startCursor.hasNext()) {
break;
}
Document startDoc = startCursor.next();
Object gteValue = startDoc.get("_id");
String className = gteValue.getClass().getName();

// Get end doc
Document endDoc = startIterable.skip(chunkSize - 1).limit(1).first();
if (endDoc == null) {
// this means we have reached the end of the doc
endDoc = col.find()
.projection(new Document("_id", 1))
.sort(new Document("_id", -1))
.limit(1)
.first();
}
if (endDoc == null) {
break;
}

Object lteValue = endDoc.get("_id");
LOG.info("Chunk of " + collectionName + ": {gte: " + gteValue.toString() + ", lte: " + lteValue.toString() + "}");
collectionPartitions.add(
PartitionIdentifier
.builder()
.withPartitionKey(String.format(DOCUMENTDB_PARTITION_KEY_FORMAT, collectionName, gteValue, lteValue, className))
.build());

startIterable = col.find(Filters.gt("_id", lteValue))
.projection(new Document("_id", 1))
.sort(new Document("_id", 1))
.limit(1);
} catch (Exception e) {
throw new RuntimeException(e);
}
startIndex = endIndex + 1;
endIndex = startIndex + chunkSize - 1;
}
return collectionPartitions;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,12 +138,11 @@ private void startProcessPartition(SourcePartition<MongoDBSnapshotProgressState>
MongoClient mongoClient = MongoDBHelper.getMongoClient(mongoDBConfig);
MongoDatabase db = mongoClient.getDatabase(collection.get(0));
MongoCollection<Document> col = db.getCollection(collection.get(1));
Bson query = MongoDBHelper.buildQuery(gte, lte, className);
MongoCursor<Document> cursor = col.find(query).iterator();
Bson query = MongoDBHelper.buildAndQuery(gte, lte, className);
long totalRecords = 0L;
long successRecords = 0L;
long failedRecords = 0L;
try {
try (MongoCursor<Document> cursor = col.find(query).iterator()) {
while (cursor.hasNext()) {
try {
String record = cursor.next().toJson();
Expand All @@ -170,7 +169,6 @@ private void startProcessPartition(SourcePartition<MongoDBSnapshotProgressState>
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
cursor.close();
final MongoDBSnapshotProgressState progressState = new MongoDBSnapshotProgressState();
progressState.setTotal(totalRecords);
progressState.setSuccess(successRecords);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public void test_get_mongodb_config_props() throws IOException {
assertThat(testConfig.getSSLInvalidHostAllowed(), is(false));
assertThat(testConfig.getCollections().size(), is(1));
assertThat(testConfig.getExportConfig().getAcknowledgements(), is(false));
assertThat(testConfig.getExportConfig().getItemsPerPartition(), is(4000L));
assertThat(testConfig.getExportConfig().getItemsPerPartition(), is(4000));
assertThat(testConfig.getExportConfig().getReadPreference(), is("secondaryPreferred"));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
Expand Down Expand Up @@ -85,18 +86,21 @@ public void test_returnPartitionsForCollection() {
mockedMongoClientsStatic.when(() -> MongoClients.create(anyString())).thenReturn(mongoClient);
when(mongoClient.getDatabase(anyString())).thenReturn(mongoDatabase);
when(mongoDatabase.getCollection(anyString())).thenReturn(col);
when(col.countDocuments()).thenReturn(5000L);
when(col.find()).thenReturn(findIterable);
when(col.find(any(Bson.class))).thenReturn(findIterable);
when(findIterable.projection(any())).thenReturn(findIterable);
when(findIterable.sort(any())).thenReturn(findIterable);
when(findIterable.skip(anyInt())).thenReturn(findIterable);
when(findIterable.limit(anyInt())).thenReturn(findIterable);
when(findIterable.iterator()).thenReturn(cursor);
when(cursor.hasNext()).thenReturn(true, true, true, true, false, true);
when(cursor.hasNext()).thenReturn(true, true, false);
// mock startDoc and endDoc returns, 0-3999, and 4000-4999
when(cursor.next())
.thenReturn(new Document("_id", "0"))
.thenReturn(new Document("_id", "4000"));
when(findIterable.first())
.thenReturn(new Document("_id", "3999"))
.thenReturn(new Document("_id", "4000"))
.thenReturn(null)
.thenReturn(new Document("_id", "4999"));
// When Apply Partition create logics
final Map<String, Object> globalStateMap = new HashMap<>();
Expand Down

0 comments on commit c600cf4

Please sign in to comment.