Skip to content

Commit

Permalink
support more id types for Bson Type
Browse files Browse the repository at this point in the history
Signed-off-by: Haidong <[email protected]>
  • Loading branch information
Haidong committed Dec 4, 2023
1 parent d23d4e3 commit d6d3c38
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,14 @@

import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.bson.types.BSONTimestamp;
import org.bson.types.Binary;
import org.bson.types.Code;
import org.bson.types.Decimal128;
import org.bson.types.ObjectId;
import org.bson.types.Symbol;
import org.opensearch.dataprepper.plugins.kafkaconnect.configuration.MongoDBConfig;

import static com.mongodb.client.model.Filters.and;
Expand All @@ -18,6 +23,10 @@

public class MongoDBHelper {
private static final String MONGO_CONNECTION_STRING_TEMPLATE = "mongodb://%s:%s@%s:%s/?replicaSet=rs0&directConnection=true&readpreference=%s&ssl=%s&tlsAllowInvalidHostnames=%s";
private static final String BINARY_PARTITION_FORMAT = "%s-%s";
private static final String BINARY_PARTITION_SPLITTER = "-";
private static final String TIMESTAMP_PARTITION_FORMAT = "%s-%s";
private static final String TIMESTAMP_PARTITION_SPLITTER = "-";

public static MongoClient getMongoClient(final MongoDBConfig mongoDBConfig) {
String username = mongoDBConfig.getCredentialsConfig().getUsername();
Expand All @@ -32,6 +41,27 @@ public static MongoClient getMongoClient(final MongoDBConfig mongoDBConfig) {
return MongoClients.create(connectionString);
}

public static String getPartitionStringFromMongoDBId(Object id, String className) {
switch (className) {
case "org.bson.Document":
return ((Document) id).toJson();
case "org.bson.types.Binary":
final byte type = ((Binary) id).getType();
final byte[] data = ((Binary) id).getData();
String typeString = String.valueOf((int) type);
String dataString = new String(data);
return String.format(BINARY_PARTITION_FORMAT, typeString, dataString);
case "org.bson.types.BSONTimestamp":
final int inc = ((BSONTimestamp) id).getInc();
final int time = ((BSONTimestamp) id).getTime();
return String.format(TIMESTAMP_PARTITION_FORMAT, inc, time);
case "org.bson.types.code":
return ((Code) id).getCode();
default:
return id.toString();
}
}

public static Bson buildAndQuery(String gte, String lte, String className) {
switch (className) {
case "java.lang.Integer":
Expand Down Expand Up @@ -64,6 +94,35 @@ public static Bson buildAndQuery(String gte, String lte, String className) {
gte("_id", Decimal128.parse(gte)),
lte("_id", Decimal128.parse(lte))
);
case "org.bson.types.Binary":
String[] gteString = gte.split(BINARY_PARTITION_SPLITTER, 2);
String[] lteString = lte.split(BINARY_PARTITION_SPLITTER, 2);
return and(
gte("_id", new Binary(Byte.parseByte(gteString[0]), gteString[1].getBytes())),
lte("_id", new Binary(Byte.parseByte(lteString[0]), lteString[1].getBytes()))
);
case "org.bson.types.BSONTimestamp":
String[] gteTimestampString = gte.split(TIMESTAMP_PARTITION_SPLITTER, 2);
String[] lteTimestampString = lte.split(TIMESTAMP_PARTITION_SPLITTER, 2);
return and(
gte("_id", new BSONTimestamp(Integer.parseInt(gteTimestampString[0]), Integer.parseInt(gteTimestampString[1]))),
lte("_id", new BSONTimestamp(Integer.parseInt(lteTimestampString[0]), Integer.parseInt(lteTimestampString[1])))
);
case "org.bson.types.code":
return and(
gte("_id", new Code(gte)),
lte("_id", new Code(lte))
);
case "org.bson.types.Symbol":
return and(
gte("_id", new Symbol(gte)),
lte("_id", new Symbol(lte))
);
case "org.bson.Document":
return and(
gte("_id", Document.parse(gte)),
lte("_id", Document.parse(lte))
);
default:
throw new RuntimeException("Unexpected _id class supported: " + className);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,13 @@ private List<PartitionIdentifier> buildPartitions(final String collectionName) {
}

Object lteValue = endDoc.get("_id");
LOG.info("Chunk of " + collectionName + ": {gte: " + gteValue.toString() + ", lte: " + lteValue.toString() + "}");
String gteValueString = MongoDBHelper.getPartitionStringFromMongoDBId(gteValue, className);
String lteValueString = MongoDBHelper.getPartitionStringFromMongoDBId(lteValue, className);
LOG.info("Chunk of " + collectionName + ": {gte: " + gteValueString + ", lte: " + lteValueString + "}");
collectionPartitions.add(
PartitionIdentifier
.builder()
.withPartitionKey(String.format(MONGODB_PARTITION_KEY_FORMAT, collectionName, gteValue, lteValue, className))
.withPartitionKey(String.format(MONGODB_PARTITION_KEY_FORMAT, collectionName, gteValueString, lteValueString, className))
.build());

startIterable = col.find(Filters.gt("_id", lteValue))
Expand Down

0 comments on commit d6d3c38

Please sign in to comment.