diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-client-service-api/src/main/java/org/apache/nifi/mongodb/MongoDBClientService.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-client-service-api/src/main/java/org/apache/nifi/mongodb/MongoDBClientService.java index 64d4ef415b38..e2ffaabd03c6 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-client-service-api/src/main/java/org/apache/nifi/mongodb/MongoDBClientService.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-client-service-api/src/main/java/org/apache/nifi/mongodb/MongoDBClientService.java @@ -18,6 +18,7 @@ package org.apache.nifi.mongodb; import com.mongodb.WriteConcern; +import com.mongodb.client.ClientSession; import com.mongodb.client.MongoDatabase; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; @@ -130,6 +131,7 @@ default Document convertJson(String query) { return Document.parse(query); } MongoDatabase getDatabase(String name); + ClientSession startSession(); String getURI(); WriteConcern getWriteConcern(); } diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoBulkOperations.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoBulkOperations.java new file mode 100644 index 000000000000..2484ef105c01 --- /dev/null +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoBulkOperations.java @@ -0,0 +1,320 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.mongodb; + +import com.mongodb.WriteConcern; +import com.mongodb.client.ClientSession; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.model.BulkWriteOptions; +import com.mongodb.client.model.Collation; +import com.mongodb.client.model.CollationAlternate; +import com.mongodb.client.model.CollationCaseFirst; +import com.mongodb.client.model.CollationMaxVariable; +import com.mongodb.client.model.CollationStrength; +import com.mongodb.client.model.DeleteManyModel; +import com.mongodb.client.model.DeleteOneModel; +import com.mongodb.client.model.DeleteOptions; +import com.mongodb.client.model.InsertOneModel; +import com.mongodb.client.model.ReplaceOneModel; +import com.mongodb.client.model.ReplaceOptions; +import com.mongodb.client.model.UpdateManyModel; +import com.mongodb.client.model.UpdateOneModel; +import com.mongodb.client.model.UpdateOptions; +import com.mongodb.client.model.WriteModel; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SystemResource; +import org.apache.nifi.annotation.behavior.SystemResourceConsideration; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.bson.BsonArray; +import org.bson.BsonDocument; +import org.bson.Document; +import org.bson.codecs.BsonArrayCodec; +import org.bson.codecs.DecoderContext; +import org.bson.conversions.Bson; +import org.bson.json.JsonReader; + +import java.io.ByteArrayOutputStream; +import java.io.InputStreamReader; +import java.io.Reader; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +@Tags({ "mongodb", "insert", "update", "write", "put", "bulk" }) +@InputRequirement(Requirement.INPUT_REQUIRED) +@CapabilityDescription("Writes the contents of a FlowFile to MongoDB as bulk-update") +@SystemResourceConsideration(resource = SystemResource.MEMORY) +public class PutMongoBulkOperations extends AbstractMongoProcessor { + static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") + .description("All FlowFiles that are written to MongoDB are routed to this relationship").build(); + static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") + .description("All FlowFiles that cannot be written to MongoDB are routed to this relationship").build(); + + static final PropertyDescriptor ORDERED = new PropertyDescriptor.Builder() + .name("Ordered") + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .description("Ordered execution of bulk-writes & break on error - otherwise arbitrary order & continue on error") + .required(false) + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .allowableValues("true", "false") + .defaultValue("true") + .build(); + + static final PropertyDescriptor TRANSACTIONS_ENABLED = new PropertyDescriptor.Builder() + .name("Transactions Enabled") + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .description("Run all actions in one MongoDB transaction") + .required(false) + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .allowableValues("true", "false") + .defaultValue("false") + .build(); + + static final PropertyDescriptor CHARACTER_SET = new PropertyDescriptor.Builder() + .name("Character Set") + .description("The Character Set in which the data is encoded") + .required(true) + .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) + .defaultValue("UTF-8") + .build(); + + private final static Set relationships; + private final static List propertyDescriptors; + + static { + List _propertyDescriptors = new ArrayList<>(); + _propertyDescriptors.addAll(descriptors); + _propertyDescriptors.add(ORDERED); + _propertyDescriptors.add(TRANSACTIONS_ENABLED); + _propertyDescriptors.add(CHARACTER_SET); + propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors); + + final Set _relationships = new HashSet<>(); + _relationships.add(REL_SUCCESS); + _relationships.add(REL_FAILURE); + relationships = Collections.unmodifiableSet(_relationships); + } + + @Override + public Set getRelationships() { + return relationships; + } + + @Override + public List getSupportedPropertyDescriptors() { + return propertyDescriptors; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final FlowFile flowFile = session.get(); + if (null == flowFile) { + return; + } + + final Charset charset = Charset.forName(context.getProperty(CHARACTER_SET).getValue()); + final WriteConcern writeConcern = clientService.getWriteConcern(); + + ClientSession clientSession = null; + try { + final MongoCollection collection = getCollection(context, flowFile).withWriteConcern(writeConcern); + // Read the contents of the FlowFile into a byte array + ByteArrayOutputStream os = new ByteArrayOutputStream(); + session.exportTo(flowFile, os); + + // parse + final BsonArrayCodec arrayCodec = new BsonArrayCodec(); + final DecoderContext decoderContext = DecoderContext.builder().build(); + final BsonArray updateItems; + try (final Reader reader = new InputStreamReader(session.read(flowFile))) { + updateItems = arrayCodec.decode(new JsonReader(reader), decoderContext); + } + + List> updateModels = new ArrayList<>(); + for (Object item : updateItems) { + final BsonDocument updateItem = (BsonDocument) item; + if (updateItem.keySet().size() != 1) { + getLogger().error("Invalid bulk-update in {}: more than one type given {}", flowFile, String.join(", ", updateItem.keySet())); + session.transfer(flowFile, REL_FAILURE); + context.yield(); + return; + } + final WriteModel writeModel = getWriteModel(context, session, flowFile, updateItem); + if (null == writeModel) { + getLogger().error("Invalid bulk-update in {}: invalid update type {}", flowFile, getUpdateType(updateItem)); + session.transfer(flowFile, REL_FAILURE); + context.yield(); + return; + } + updateModels.add(writeModel); + } + + if (context.getProperty(TRANSACTIONS_ENABLED).asBoolean()) { + clientSession = clientService.startSession(); + clientSession.startTransaction(); + // now run this w/in a transaction + collection.bulkWrite(clientSession, updateModels, (new BulkWriteOptions().ordered(context.getProperty(ORDERED).asBoolean()))); + } else { + collection.bulkWrite(updateModels, (new BulkWriteOptions().ordered(context.getProperty(ORDERED).asBoolean()))); + } + getLogger().info("bulk-updated {} into MongoDB", flowFile); + + session.getProvenanceReporter().send(flowFile, getURI(context)); + session.transfer(flowFile, REL_SUCCESS); + + if (clientSession != null) { + if (clientSession.hasActiveTransaction()) { + clientSession.commitTransaction(); + } + clientSession.close(); + } + } catch (Exception e) { + getLogger().error("Failed to bulk-update {} into MongoDB", flowFile, e); + session.transfer(flowFile, REL_FAILURE); + context.yield(); + try { + if (clientSession != null) { + if (clientSession.hasActiveTransaction()) { + clientSession.abortTransaction(); + } + clientSession.close(); + } + } catch (Exception ee) { + getLogger().warn("Cannot rollback client session", ee); // (but no further action) + } + } + } + + private WriteModel getWriteModel(ProcessContext context, ProcessSession session, FlowFile flowFile, BsonDocument updateItem) { + final String updateType = getUpdateType(updateItem); + final BsonDocument updateSpec = (BsonDocument) updateItem.get(updateType); + final WriteModel writeModel; + if ("insertOne".equals(updateType)) { + writeModel = new InsertOneModel<>(toBsonDocument((BsonDocument) updateSpec.get("document"))); + } else if ("updateOne".equals(updateType)) { + final UpdateOptions options = parseUpdateOptions(updateSpec); + writeModel = new UpdateOneModel<>((BsonDocument) updateSpec.get("filter"), (BsonDocument) updateSpec.get("update"), options); + } else if ("updateMany".equals(updateType)) { + final UpdateOptions options = parseUpdateOptions(updateSpec); + writeModel = new UpdateManyModel<>((BsonDocument) updateSpec.get("filter"), (BsonDocument) updateSpec.get("update"), options); + } else if ("replaceOne".equals(updateType)) { + final ReplaceOptions options = parseReplaceOptions(updateSpec); + writeModel = new ReplaceOneModel<>((BsonDocument) updateSpec.get("filter"), + toBsonDocument((BsonDocument) updateSpec.get("replacement")), options); + } else if ("deleteOne".equals(updateType)) { + final DeleteOptions options = parseDeleteOptions(updateSpec); + writeModel = new DeleteOneModel<>((BsonDocument) updateSpec.get("filter"), options); + } else if ("deleteMany".equals(updateType)) { + final DeleteOptions options = parseDeleteOptions(updateSpec); + writeModel = new DeleteManyModel<>((BsonDocument) updateSpec.get("filter"), options); + } else { + return null; + } + return writeModel; + } + + private static String getUpdateType(BsonDocument updateItem) { + return updateItem.keySet().iterator().next(); + } + + private static Document toBsonDocument(BsonDocument doc) { + if (null == doc) { + return null; + } + return new Document(doc); + } + + protected UpdateOptions parseUpdateOptions(BsonDocument updateSpec) { + final UpdateOptions options = new UpdateOptions(); + if (updateSpec.containsKey("upsert")) { + options.upsert(updateSpec.getBoolean("upsert").getValue()); + } + if (updateSpec.containsKey("arrayFilters")) { + options.arrayFilters((List) updateSpec.get("arrayFilters")); + } + if (updateSpec.containsKey("collation")) { + options.collation(parseCollation((BsonDocument) updateSpec.get("collation"))); + } + return options; + } + + protected ReplaceOptions parseReplaceOptions(BsonDocument updateSpec) { + final ReplaceOptions options = new ReplaceOptions(); + if (updateSpec.containsKey("upsert")) { + options.upsert(updateSpec.getBoolean("upsert").getValue()); + } + if (updateSpec.containsKey("collation")) { + options.collation(parseCollation((BsonDocument) updateSpec.get("collation"))); + } + return options; + } + + protected DeleteOptions parseDeleteOptions(BsonDocument updateSpec) { + final DeleteOptions options = new DeleteOptions(); + if (updateSpec.containsKey("collation")) { + options.collation(parseCollation((BsonDocument) updateSpec.get("collation"))); + } + return options; + } + + protected Collation parseCollation(BsonDocument collationSpec) { + final Collation.Builder builder = Collation.builder(); + if (collationSpec.containsKey("locale")) { + builder.locale(collationSpec.getString("locale").getValue()); + } + if (collationSpec.containsKey("caseLevel")) { + builder.caseLevel(collationSpec.getBoolean("caseLevel").getValue()); + } + if (collationSpec.containsKey("caseFirst")) { + builder.collationCaseFirst(CollationCaseFirst.fromString(collationSpec.getString("caseFirst").getValue())); + } + if (collationSpec.containsKey("strength")) { + builder.collationStrength(CollationStrength.fromInt(collationSpec.getInt32("strength").getValue())); + } + if (collationSpec.containsKey("numericOrdering")) { + builder.numericOrdering(collationSpec.getBoolean("numericOrdering").getValue()); + } + if (collationSpec.containsKey("alternate")) { + builder.collationAlternate(CollationAlternate.fromString(collationSpec.getString("alternate").getValue())); + } + if (collationSpec.containsKey("maxVariable")) { + builder.collationMaxVariable(CollationMaxVariable.fromString(collationSpec.getString("maxVariable").getValue())); + } + if (collationSpec.containsKey("normalization")) { + builder.normalization(collationSpec.getBoolean("normalization").getValue()); + } + if (collationSpec.containsKey("backwards")) { + builder.backwards(collationSpec.getBoolean("backwards").getValue()); + } + final Collation collation = builder.build(); + return collation; + } + +} diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 3797ca06211f..5a040eab7b0b 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -21,4 +21,5 @@ org.apache.nifi.processors.mongodb.PutMongo org.apache.nifi.processors.mongodb.PutMongoRecord org.apache.nifi.processors.mongodb.gridfs.DeleteGridFS org.apache.nifi.processors.mongodb.gridfs.FetchGridFS -org.apache.nifi.processors.mongodb.gridfs.PutGridFS \ No newline at end of file +org.apache.nifi.processors.mongodb.gridfs.PutGridFS +org.apache.nifi.processors.mongodb.PutMongoBulkOperations \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/docs/org.apache.nifi.processors.mongodb.PutMongoBulkOperation/additionalDetails.html b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/docs/org.apache.nifi.processors.mongodb.PutMongoBulkOperation/additionalDetails.html new file mode 100644 index 000000000000..8d78f68f14ae --- /dev/null +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/docs/org.apache.nifi.processors.mongodb.PutMongoBulkOperation/additionalDetails.html @@ -0,0 +1,45 @@ + + + + + + PutMongoBulkOperation + + + + + +

Description:

+

+ This processor runs bulk updates against MongoDB collections. The flowfile content is expected to be a JSON array with bulk write operations as described in the manual for db.collection.bulkWrite. +

+

+ You can use all (currently 6) operators described there. The flowfile content is returned as-is. You can merge many operations into one - and get massive performance improvements. +

+

Example:

+

+ The following is an example flowfile content that does two things: insert a new document, and update all documents where value of hey is greater than zero. +

+ +
+                [
+                    {"insertOne": {"document": {"ho": 42}}},
+                    {"updateMany": {"filter": {"hey": {"$gt": 0}}, "update": {"$inc": {"hey": 2}}}}
+                ]
+            
+
+ + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoBulkOperationsIT.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoBulkOperationsIT.java new file mode 100644 index 000000000000..f8f05b5cb562 --- /dev/null +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoBulkOperationsIT.java @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.mongodb; + +import org.apache.nifi.util.TestRunner; +import org.bson.Document; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; + +public class PutMongoBulkOperationsIT extends MongoWriteTestBase { + + @BeforeEach + public void setup() { + super.setup(PutMongoBulkOperations.class); + } + + @Override + @AfterEach + public void teardown() { + super.teardown(); + } + + @Test + public void testBulkWriteInsert() throws Exception { + final TestRunner runner = init(PutMongoBulkOperations.class); + + StringBuffer doc = new StringBuffer(); + doc.append("["); + for (int i = 0; i < DOCUMENTS.size(); i++) { + if (i > 0) { + doc.append(", "); + } + doc.append("{\"insertOne\": {\"document\": "); + doc.append(DOCUMENTS.get(i).toJson()); + doc.append("}}"); + } + doc.append("]"); + runner.enqueue(doc.toString()); + runner.run(); + runner.assertTransferCount(PutMongo.REL_FAILURE, 0); + runner.assertTransferCount(PutMongo.REL_SUCCESS, 1); + + assertEquals(3, collection.countDocuments()); + Document doc1 = collection.find(new Document().append("_id", "doc_1")).first(); + assertNotNull(doc1); + assertEquals(3, doc1.getInteger("c", 0)); + } + + @Test + public void testBulkWriteUpdateOne() throws Exception { + final TestRunner runner = init(PutMongoBulkOperations.class); + + collection.insertMany(DOCUMENTS); + + runner.enqueue("[{\"updateOne\": {\"filter\": {\"_id\": {\"$in\": [\"doc_1\", \"doc_2\"]}}, \"update\": {\"$set\": {\"z\": 42}}}}]"); + runner.run(); + runner.assertTransferCount(PutMongo.REL_FAILURE, 0); + runner.assertTransferCount(PutMongo.REL_SUCCESS, 1); + + assertEquals(1, collection.countDocuments(new Document().append("z", 42))); + } + + @Test + public void testBulkWriteUpdateMany() throws Exception { + final TestRunner runner = init(PutMongoBulkOperations.class); + + collection.insertMany(DOCUMENTS); + + runner.enqueue("[{\"updateMany\": {\"filter\": {\"_id\": {\"$in\": [\"doc_1\", \"doc_2\"]}}, \"update\": {\"$set\": {\"z\": 42}}}}]"); + runner.run(); + runner.assertTransferCount(PutMongo.REL_FAILURE, 0); + runner.assertTransferCount(PutMongo.REL_SUCCESS, 1); + + assertEquals(2, collection.countDocuments(new Document().append("z", 42))); + } + + @Test + public void testBulkWriteReplaceOne() throws Exception { + final TestRunner runner = init(PutMongoBulkOperations.class); + + collection.insertMany(DOCUMENTS); + + runner.enqueue("[{\"replaceOne\": {\"filter\": {\"_id\": \"doc_1\"}, \"replacement\": {\"_id\": \"doc_1\", \"z\": 42}}}]"); + runner.run(); + runner.assertTransferCount(PutMongo.REL_FAILURE, 0); + runner.assertTransferCount(PutMongo.REL_SUCCESS, 1); + + assertEquals(1, collection.countDocuments(new Document().append("z", 42))); + Document doc1 = collection.find(new Document().append("_id", "doc_1")).first(); + assertNotNull(doc1); + assertEquals(42, doc1.getInteger("z", 0)); + assertNull(doc1.get("a")); + } + + @Test + public void testBulkWriteDeleteOne() throws Exception { + final TestRunner runner = init(PutMongoBulkOperations.class); + + collection.insertMany(DOCUMENTS); + + runner.enqueue("[{\"deleteOne\": {\"filter\": {\"_id\": {\"$in\": [\"doc_1\", \"doc_2\"]}}}}]"); + runner.run(); + runner.assertTransferCount(PutMongo.REL_FAILURE, 0); + runner.assertTransferCount(PutMongo.REL_SUCCESS, 1); + + assertEquals(2, collection.countDocuments()); + assertEquals(0, collection.countDocuments(new Document().append("z", 42))); + } + + @Test + public void testBulkWriteDeleteMany() throws Exception { + final TestRunner runner = init(PutMongoBulkOperations.class); + + collection.insertMany(DOCUMENTS); + + runner.enqueue("[{\"deleteMany\": {\"filter\": {\"_id\": {\"$in\": [\"doc_1\", \"doc_2\"]}}}}]"); + runner.run(); + runner.assertTransferCount(PutMongo.REL_FAILURE, 0); + runner.assertTransferCount(PutMongo.REL_SUCCESS, 1); + + assertEquals(1, collection.countDocuments()); + assertEquals(0, collection.countDocuments(new Document().append("z", 42))); + } + + @Test + public void testInvalid() throws Exception { + final TestRunner runner = init(PutMongoBulkOperations.class); + + runner.enqueue("[{\"whatever\": {\"filter\": {\"_id\": {\"$in\": [\"doc_1\", \"doc_2\"]}}}}]"); + runner.run(); + runner.assertTransferCount(PutMongo.REL_FAILURE, 1); + runner.assertTransferCount(PutMongo.REL_SUCCESS, 0); + } + + @Test + public void testBulkWriteOrderedAsIs() throws Exception { + final TestRunner runner = init(PutMongoBulkOperations.class); + runner.setProperty(PutMongoBulkOperations.ORDERED, "true"); // default, still + + StringBuffer doc = new StringBuffer(); + doc.append("["); + // inserting same ID twice fails w/in mongo, not before, so we can really test transactions and ordering + doc.append("{\"insertOne\": {\"document\": {\"_id\": \"doc_1\"}}},{\"insertOne\": {\"document\": "); + doc.append(DOCUMENTS.get(0).toJson()); + doc.append("}}]"); + runner.enqueue(doc.toString()); + runner.run(); + runner.assertTransferCount(PutMongo.REL_FAILURE, 1); + runner.assertTransferCount(PutMongo.REL_SUCCESS, 0); + + assertEquals(1, collection.countDocuments()); + } + + @Test + public void testBulkWriteOrderedNoTransaction() throws Exception { + final TestRunner runner = init(PutMongoBulkOperations.class); + runner.setProperty(PutMongoBulkOperations.ORDERED, "true"); // default, still + runner.setProperty(PutMongoBulkOperations.TRANSACTIONS_ENABLED, "false"); // default, still + + StringBuffer doc = new StringBuffer(); + doc.append("["); + doc.append("{\"insertOne\": {\"document\": "); + doc.append(DOCUMENTS.get(0).toJson()); + // inserting same ID twice fails w/in mongo, not before, so we can really test transactions and ordering + doc.append("}}, {\"insertOne\": {\"document\": {\"_id\": \"doc_1\"}}}]"); + runner.enqueue(doc.toString()); + runner.run(); + runner.assertTransferCount(PutMongo.REL_FAILURE, 1); + runner.assertTransferCount(PutMongo.REL_SUCCESS, 0); + + assertEquals(1, collection.countDocuments()); + } + + @Test + public void testBulkWriteOrderedWithTransaction() throws Exception { + final TestRunner runner = init(PutMongoBulkOperations.class); + runner.setProperty(PutMongoBulkOperations.ORDERED, "true"); // default, still + runner.setProperty(PutMongoBulkOperations.TRANSACTIONS_ENABLED, "true"); + + StringBuffer doc = new StringBuffer(); + doc.append("["); + doc.append("{\"insertOne\": {\"document\": "); + doc.append(DOCUMENTS.get(0).toJson()); + // inserting same ID twice fails w/in mongo, not before, so we can really test transactions and ordering + doc.append("}}, {\"insertOne\": {\"document\": {\"_id\": \"doc_1\"}}}]"); + runner.enqueue(doc.toString()); + runner.run(); + runner.assertTransferCount(PutMongo.REL_FAILURE, 1); + runner.assertTransferCount(PutMongo.REL_SUCCESS, 0); + + assertEquals(0, collection.countDocuments()); + } + + @Test + public void testBulkWriteUnordered() throws Exception { + final TestRunner runner = init(PutMongoBulkOperations.class); + runner.setProperty(PutMongoBulkOperations.ORDERED, "false"); + + StringBuffer doc = new StringBuffer(); + doc.append("["); + // inserting same ID twice fails w/in mongo, not before, so we can really test transactions and ordering + doc.append("{\"insertOne\": {\"document\": {\"_id\": \"doc_1\"}}},{\"insertOne\": {\"document\": "); + doc.append(DOCUMENTS.get(0).toJson()); + doc.append("}}]"); + runner.enqueue(doc.toString()); + runner.run(); + runner.assertTransferCount(PutMongo.REL_FAILURE, 1); + runner.assertTransferCount(PutMongo.REL_SUCCESS, 0); + + assertEquals(1, collection.countDocuments()); + } + +} diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBControllerService.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBControllerService.java index 3347eedf4f5d..bf53799f17ec 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBControllerService.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBControllerService.java @@ -19,6 +19,7 @@ import com.mongodb.ConnectionString; import com.mongodb.MongoClientSettings; +import com.mongodb.client.ClientSession; import com.mongodb.client.MongoClient; import com.mongodb.WriteConcern; import com.mongodb.client.MongoClients; @@ -196,6 +197,11 @@ public MongoDatabase getDatabase(String name) { return mongoClient.getDatabase(name); } + @Override + public ClientSession startSession() { + return mongoClient.startSession(); + } + @Override public String getURI() { return uri;