Skip to content

Commit

Permalink
add more unit tests
Browse files Browse the repository at this point in the history
Signed-off-by: Haidong <[email protected]>
  • Loading branch information
Haidong committed Nov 8, 2023
1 parent 05c8d7e commit 94e216e
Show file tree
Hide file tree
Showing 4 changed files with 292 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,14 @@

import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import org.bson.conversions.Bson;
import org.bson.types.ObjectId;
import org.opensearch.dataprepper.plugins.kafkaconnect.configuration.MongoDBConfig;

import static com.mongodb.client.model.Filters.and;
import static com.mongodb.client.model.Filters.gte;
import static com.mongodb.client.model.Filters.lte;

public class MongoDBHelper {

public static MongoClient getMongoClient(final MongoDBConfig mongoDBConfig) {
Expand All @@ -24,4 +30,36 @@ public static MongoClient getMongoClient(final MongoDBConfig mongoDBConfig) {

return MongoClients.create(connectionString);
}

public static Bson buildQuery(String gte, String lte, String className) {
switch (className) {
case "java.lang.Integer":
return and(
gte("_id", Integer.parseInt(gte)),
lte("_id", Integer.parseInt(lte))
);
case "java.lang.Double":
return and(
gte("_id", Double.parseDouble(gte)),
lte("_id", Double.parseDouble(lte))
);
case "java.lang.String":
return and(
gte("_id", gte),
lte("_id", lte)
);
case "java.lang.Long":
return and(
gte("_id", Long.parseLong(gte)),
lte("_id", Long.parseLong(lte))
);
case "org.bson.types.ObjectId":
return and(
gte("_id", new ObjectId(gte)),
lte("_id", new ObjectId(lte))
);
default:
throw new RuntimeException("Unexpected _id class supported: " + className);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import io.micrometer.core.instrument.Counter;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.bson.types.ObjectId;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
Expand All @@ -35,16 +34,13 @@
import java.util.Map;
import java.util.Optional;

import static com.mongodb.client.model.Filters.and;
import static com.mongodb.client.model.Filters.gte;
import static com.mongodb.client.model.Filters.lte;

public class MongoDBSnapshotWorker implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(MongoDBSnapshotWorker.class);
private static final Duration BACKOFF_ON_EXCEPTION = Duration.ofSeconds(60);
private static final Duration BACKOFF_ON_EMPTY_PARTITION = Duration.ofSeconds(60);
private static final Duration ACKNOWLEDGEMENT_SET_TIMEOUT = Duration.ofHours(2);
private static final String SUCCESS_ITEM_COUNTER_NAME = "exportRecordsSuccessTotal";
private static final String FAILURE_ITEM_COUNTER_NAME = "exportRecordsFailedTotal";
private static final String SUCCESS_PARTITION_COUNTER_NAME = "exportPartitionSuccessTotal";
private static final String FAILURE_PARTITION_COUNTER_NAME = "exportPartitionFailureTotal";
private static final String EVENT_SOURCE_COLLECTION_ATTRIBUTE = "__collection";
Expand All @@ -58,10 +54,12 @@ public class MongoDBSnapshotWorker implements Runnable {
private final AcknowledgementSetManager acknowledgementSetManager;
private final MongoDBConfig mongoDBConfig;
private final Counter successItemsCounter;
private final Counter failureItemsCounter;
private final Counter successPartitionCounter;
private final Counter failureParitionCounter;
private final ObjectMapper objectMapper = new ObjectMapper();
private final TypeReference<Map<String, Object>> mapTypeReference = new TypeReference<Map<String, Object>>() {};
private final TypeReference<Map<String, Object>> mapTypeReference = new TypeReference<Map<String, Object>>() {
};


public MongoDBSnapshotWorker(final SourceCoordinator<MongoDBSnapshotProgressState> sourceCoordinator,
Expand All @@ -76,6 +74,7 @@ public MongoDBSnapshotWorker(final SourceCoordinator<MongoDBSnapshotProgressStat
this.acknowledgementSetManager = acknowledgementSetManager;
this.mongoDBConfig = mongoDBConfig;
this.successItemsCounter = pluginMetrics.counter(SUCCESS_ITEM_COUNTER_NAME);
this.failureItemsCounter = pluginMetrics.counter(FAILURE_ITEM_COUNTER_NAME);
this.successPartitionCounter = pluginMetrics.counter(SUCCESS_PARTITION_COUNTER_NAME);
this.failureParitionCounter = pluginMetrics.counter(FAILURE_PARTITION_COUNTER_NAME);
}
Expand Down Expand Up @@ -107,6 +106,7 @@ public void run() {
}
successPartitionCounter.increment();
} catch (final Exception e) {
LOG.error("Received an exception while processing the partition.", e);
sourceCoordinator.giveUpPartitions();
failureParitionCounter.increment();
}
Expand Down Expand Up @@ -137,21 +137,26 @@ private void startProcessPartition(SourcePartition<MongoDBSnapshotProgressState>
MongoClient mongoClient = MongoDBHelper.getMongoClient(mongoDBConfig);
MongoDatabase db = mongoClient.getDatabase(collection.get(0));
MongoCollection<Document> col = db.getCollection(collection.get(1));
Bson query = this.buildQuery(gte, lte, className);
Bson query = MongoDBHelper.buildQuery(gte, lte, className);
MongoCursor<Document> cursor = col.find(query).iterator();
int totalRecords = 0;
try {
while (cursor.hasNext()) {

String record = cursor.next().toJson();
Map<String, Object> data = convertToMap(record);
data.putIfAbsent(EVENT_SOURCE_DB_ATTRIBUTE, collection.get(0));
data.putIfAbsent(EVENT_SOURCE_COLLECTION_ATTRIBUTE, collection.get(1));
data.putIfAbsent(EVENT_SOURCE_OPERATION, OpenSearchBulkActions.CREATE.toString());
if (buffer.isByteBuffer()) {
buffer.writeBytes(objectMapper.writeValueAsBytes(data), null, DEFAULT_BUFFER_WRITE_TIMEOUT_MS);
} else {
buffer.write(getEventFromData(data), DEFAULT_BUFFER_WRITE_TIMEOUT_MS);
try {
Map<String, Object> data = convertToMap(record);
data.putIfAbsent(EVENT_SOURCE_DB_ATTRIBUTE, collection.get(0));
data.putIfAbsent(EVENT_SOURCE_COLLECTION_ATTRIBUTE, collection.get(1));
data.putIfAbsent(EVENT_SOURCE_OPERATION, OpenSearchBulkActions.CREATE.toString());
if (buffer.isByteBuffer()) {
buffer.writeBytes(objectMapper.writeValueAsBytes(data), null, DEFAULT_BUFFER_WRITE_TIMEOUT_MS);
} else {
buffer.write(getEventFromData(data), DEFAULT_BUFFER_WRITE_TIMEOUT_MS);
}
} catch (JsonProcessingException e) {
LOG.error("failed to add record to buffer with error {}", e.getMessage());
failureItemsCounter.increment();
continue;
}
successItemsCounter.increment();
totalRecords += 1;
Expand All @@ -166,39 +171,6 @@ private void startProcessPartition(SourcePartition<MongoDBSnapshotProgressState>
sourceCoordinator.saveProgressStateForPartition(partition.getPartitionKey(), progressState);
}

private Bson buildQuery(String gte, String lte, String className) {
switch (className) {
case "java.lang.Integer":
return and(
gte("_id", Integer.parseInt(gte)),
lte("_id", Integer.parseInt(lte))
);
case "java.lang.Double":
return and(
gte("_id", Double.parseDouble(gte)),
lte("_id", Double.parseDouble(lte))
);
case "java.lang.String":
return and(
gte("_id", gte),
lte("_id", lte)
);
case "java.lang.Long":
return and(
gte("_id", Long.parseLong(gte)),
lte("_id", Long.parseLong(lte))
);
case "org.bson.types.ObjectId":
return and(
gte("_id", new ObjectId(gte)),
lte("_id", new ObjectId(lte))
);
default:
throw new RuntimeException("Unexpected _id class supported: " + className);
}

}

private Optional<AcknowledgementSet> createAcknowledgementSet(SourcePartition<MongoDBSnapshotProgressState> partition) {
if (mongoDBConfig.getExportConfig().getAcknowledgements()) {
return Optional.of(this.acknowledgementSetManager.create((result) -> {
Expand All @@ -210,12 +182,8 @@ private Optional<AcknowledgementSet> createAcknowledgementSet(SourcePartition<Mo
return Optional.empty();
}

private Map<String, Object> convertToMap(String jsonData) {
try {
return objectMapper.readValue(jsonData, mapTypeReference);
} catch (JsonProcessingException e) {
return null;
}
private Map<String, Object> convertToMap(String jsonData) throws JsonProcessingException {
return objectMapper.readValue(jsonData, mapTypeReference);
}

private Record<Object> getEventFromData(Map<String, Object> data) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public void setup() {
collectionConfig = mock(MongoDBConfig.CollectionConfig.class);
lenient().when(collectionConfig.getCollectionName()).thenReturn(TEST_COLLECTION_NAME);
lenient().when(mongoDBConfig.getCollections()).thenReturn(Collections.singletonList(collectionConfig));
lenient().when(mongoDBConfig.getCredentialsConfig()).thenReturn(new CredentialsConfig(new CredentialsConfig.PlainText("", ""), null));
lenient().when(mongoDBConfig.getCredentialsConfig()).thenReturn(new CredentialsConfig(new CredentialsConfig.PlainText("user", "user"), null));
lenient().when(mongoDBConfig.getExportConfig()).thenReturn(new MongoDBConfig.ExportConfig());
testSupplier = new MongoDBPartitionCreationSupplier(mongoDBConfig);
}
Expand Down
Loading

0 comments on commit 94e216e

Please sign in to comment.