Skip to content

Commit

Permalink
address PR comments
Browse files Browse the repository at this point in the history
Signed-off-by: Haidong <[email protected]>
  • Loading branch information
Haidong committed Nov 30, 2023
1 parent f2251cd commit d23d4e3
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 11 deletions.
4 changes: 2 additions & 2 deletions data-prepper-plugins/kafka-connect-plugins/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,8 @@ Snapshot Mode [ref](https://debezium.io/documentation/reference/stable/connector
### MongoDB
| Option | Required | Default | Type | Description |
|----------------|----------|---------------|--------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| hostname | YES | | String | The hostname of MySQL. |
| port | NO | 27017 | String | The port of MySQL. |
| hostname | YES | | String | The hostname of MongoDB server. |
| port | NO | 27017 | String | The port of MongoDB server. |
| ssl | NO | FALSE | Boolean | Connector will use SSL to connect to MongoDB instances. |
| ingestion_mode | NO | export_stream | String | MongoDB ingestion mode. Available options: export_stream, stream, export |
| export_config | NO | | ExportConfig | The Export Config |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Objects;

/**
Expand All @@ -35,6 +36,7 @@
@DataPrepperPlugin(name = "mongodb", pluginType = Source.class, pluginConfigurationType = MongoDBConfig.class)
public class MongoDBSource extends KafkaConnectSource implements UsesSourceCoordination {
private static final Logger LOG = LoggerFactory.getLogger(MongoDBSource.class);
private static final String COLLECTION_SPLITTER = "\\.";

private final AwsCredentialsSupplier awsCredentialsSupplier;

Expand All @@ -61,6 +63,7 @@ public MongoDBSource(final MongoDBConfig mongoDBConfig,
this.acknowledgementSetManager = acknowledgementSetManager;
this.awsCredentialsSupplier = awsCredentialsSupplier;
this.byteDecoder = new JsonDecoder();
this.validateCollections();
}

@Override
Expand Down Expand Up @@ -110,4 +113,15 @@ private boolean shouldStartInitialLoad() {
return mongoDBConfig.getIngestionMode() == MongoDBConfig.IngestionMode.EXPORT_STREAM
|| mongoDBConfig.getIngestionMode() == MongoDBConfig.IngestionMode.EXPORT;
}

private void validateCollections() {
MongoDBConfig config = (MongoDBConfig) this.connectorConfig;
List<MongoDBConfig.CollectionConfig> collectionConfigs = config.getCollections();
collectionConfigs.forEach(collectionConfig -> {
List<String> collection = List.of(collectionConfig.getCollectionName().split(COLLECTION_SPLITTER));
if (collection.size() < 2) {
throw new IllegalArgumentException("Invalid Collection Name. Must be in db.collection format");
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import org.bson.conversions.Bson;
import org.bson.types.Decimal128;
import org.bson.types.ObjectId;
import org.opensearch.dataprepper.plugins.kafkaconnect.configuration.MongoDBConfig;

Expand All @@ -16,17 +17,17 @@
import static com.mongodb.client.model.Filters.lte;

public class MongoDBHelper {
private static final String MONGO_CONNECTION_STRING_TEMPLATE = "mongodb://%s:%s@%s:%s/?replicaSet=rs0&directConnection=true&readpreference=%s&ssl=%s&tlsAllowInvalidHostnames=%s";

public static MongoClient getMongoClient(final MongoDBConfig mongoDBConfig) {
String template = "mongodb://%s:%s@%s:%s/?replicaSet=rs0&directConnection=true&readpreference=%s&ssl=%s&tlsAllowInvalidHostnames=%s";
String username = mongoDBConfig.getCredentialsConfig().getUsername();
String password = mongoDBConfig.getCredentialsConfig().getPassword();
String hostname = mongoDBConfig.getHostname();
String port = mongoDBConfig.getPort();
String ssl = mongoDBConfig.getSSLEnabled().toString();
String invalidHostAllowed = mongoDBConfig.getSSLInvalidHostAllowed().toString();
String readPreference = mongoDBConfig.getExportConfig().getReadPreference();
String connectionString = String.format(template, username, password, hostname, port, readPreference, ssl, invalidHostAllowed);
String connectionString = String.format(MONGO_CONNECTION_STRING_TEMPLATE, username, password, hostname, port, readPreference, ssl, invalidHostAllowed);

return MongoClients.create(connectionString);
}
Expand Down Expand Up @@ -58,6 +59,11 @@ public static Bson buildAndQuery(String gte, String lte, String className) {
gte("_id", new ObjectId(gte)),
lte("_id", new ObjectId(lte))
);
case "org.bson.types.Decimal128":
return and(
gte("_id", Decimal128.parse(gte)),
lte("_id", Decimal128.parse(lte))
);
default:
throw new RuntimeException("Unexpected _id class supported: " + className);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
public class MongoDBPartitionCreationSupplier implements Function<Map<String, Object>, List<PartitionIdentifier>> {
public static final String GLOBAL_STATE_PARTITIONED_COLLECTION_KEY = "partitionedCollections";
private static final Logger LOG = LoggerFactory.getLogger(MongoDBPartitionCreationSupplier.class);
private static final String DOCUMENTDB_PARTITION_KEY_FORMAT = "%s|%s|%s|%s"; // partition format: <db.collection>|<gte>|<lt>|<className>
private static final String MONGODB_PARTITION_KEY_FORMAT = "%s|%s|%s|%s"; // partition format: <db.collection>|<gte>|<lt>|<className>
private static final String COLLECTION_SPLITTER = "\\.";

private final MongoDBConfig mongoDBConfig;

public MongoDBPartitionCreationSupplier(final MongoDBConfig mongoDBConfig) {
Expand Down Expand Up @@ -69,9 +71,9 @@ private List<String> getCollectionsToInitPartitions(final MongoDBConfig mongoDBC

private List<PartitionIdentifier> buildPartitions(final String collectionName) {
List<PartitionIdentifier> collectionPartitions = new ArrayList<>();
List<String> collection = List.of(collectionName.split("\\."));
List<String> collection = List.of(collectionName.split(COLLECTION_SPLITTER));
if (collection.size() < 2) {
throw new IllegalArgumentException("Invalid Collection Name. Must as db.collection format");
throw new IllegalArgumentException("Invalid Collection Name. Must be in db.collection format");
}
try (MongoClient mongoClient = MongoDBHelper.getMongoClient(mongoDBConfig)) {
MongoDatabase db = mongoClient.getDatabase(collection.get(0));
Expand Down Expand Up @@ -109,14 +111,15 @@ private List<PartitionIdentifier> buildPartitions(final String collectionName) {
collectionPartitions.add(
PartitionIdentifier
.builder()
.withPartitionKey(String.format(DOCUMENTDB_PARTITION_KEY_FORMAT, collectionName, gteValue, lteValue, className))
.withPartitionKey(String.format(MONGODB_PARTITION_KEY_FORMAT, collectionName, gteValue, lteValue, className))
.build());

startIterable = col.find(Filters.gt("_id", lteValue))
.projection(new Document("_id", 1))
.sort(new Document("_id", 1))
.limit(1);
} catch (Exception e) {
LOG.error("Failed to read start cursor when build partitions", e);
throw new RuntimeException(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,10 @@ public class MongoDBSnapshotWorker implements Runnable {
private static final String EVENT_SOURCE_OPERATION = "__op";
private static final String EVENT_SOURCE_TS_MS = "__source_ts_ms";
private static final String EVENT_TYPE = "EXPORT";
private static int DEFAULT_BUFFER_WRITE_TIMEOUT_MS = 5000;
private static final String PARTITION_KEY_SPLITTER = "\\|";
private static final String COLLECTION_SPLITTER = "\\.";
private final SourceCoordinator<MongoDBSnapshotProgressState> sourceCoordinator;
private static int DEFAULT_BUFFER_WRITE_TIMEOUT_MS = 5000;
private final Buffer<Record<Object>> buffer;
private final MongoDBPartitionCreationSupplier mongoDBPartitionCreationSupplier;
private final AcknowledgementSetManager acknowledgementSetManager;
Expand Down Expand Up @@ -126,11 +128,11 @@ public void run() {
}

private void startProcessPartition(SourcePartition<MongoDBSnapshotProgressState> partition) {
List<String> partitionKeys = List.of(partition.getPartitionKey().split("\\|"));
List<String> partitionKeys = List.of(partition.getPartitionKey().split(PARTITION_KEY_SPLITTER));
if (partitionKeys.size() < 4) {
throw new RuntimeException("Invalid Partition Key. Must as db.collection|gte|lte format.");
}
List<String> collection = List.of(partitionKeys.get(0).split("\\."));
List<String> collection = List.of(partitionKeys.get(0).split(COLLECTION_SPLITTER));
final String gte = partitionKeys.get(1);
final String lte = partitionKeys.get(2);
final String className = partitionKeys.get(3);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.opensearch.dataprepper.plugins.kafkaconnect.source.mongoDB.MongoDBService;
import org.opensearch.dataprepper.plugins.kafkaconnect.source.mongoDB.MongoDBSnapshotProgressState;

import java.util.List;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
Expand Down Expand Up @@ -79,6 +81,22 @@ void testConstructorValidations() {
null));
}

@Test
void testConstructorValidations_invalidCollectionName() {
MongoDBConfig.CollectionConfig collectionConfig = mock(MongoDBConfig.CollectionConfig.class);
when(collectionConfig.getCollectionName()).thenReturn("invalidName");
when(mongoDBConfig.getIngestionMode()).thenReturn(MongoDBConfig.IngestionMode.EXPORT);
when(mongoDBConfig.getCollections()).thenReturn(List.of(collectionConfig));
assertThrows(IllegalArgumentException.class, () -> new MongoDBSource(
mongoDBConfig,
pluginMetrics,
pipelineDescription,
acknowledgementSetManager,
awsCredentialsSupplier,
null,
null));
}

@Test
void testExportConstructor() {
when(mongoDBConfig.getIngestionMode()).thenReturn(MongoDBConfig.IngestionMode.EXPORT);
Expand Down

0 comments on commit d23d4e3

Please sign in to comment.