Skip to content

Commit

Permalink
NIFI-11129 - adding PutMongoBulkOperations processor to use the bulkW…
Browse files Browse the repository at this point in the history
…rite API for way more efficient mass updates or inserts - finishing touches
  • Loading branch information
sebastianrothbucher committed Nov 7, 2023
1 parent a93593a commit 5960536
Showing 1 changed file with 6 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
import org.bson.conversions.Bson;
import org.bson.json.JsonReader;

import java.io.ByteArrayOutputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.nio.charset.Charset;
Expand All @@ -80,7 +79,7 @@ public class PutMongoBulkOperations extends AbstractMongoProcessor {
static final PropertyDescriptor ORDERED = new PropertyDescriptor.Builder()
.name("Ordered")
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.description("Ordered execution of bulk-writes & break on error - otherwise arbitrary order & continue on error")
.description("Ordered execution of bulk-writes and break on error - otherwise arbitrary order and continue on error")
.required(false)
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.allowableValues("true", "false")
Expand Down Expand Up @@ -145,15 +144,12 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
ClientSession clientSession = null;
try {
final MongoCollection<Document> collection = getCollection(context, flowFile).withWriteConcern(writeConcern);
// Read the contents of the FlowFile into a byte array
ByteArrayOutputStream os = new ByteArrayOutputStream();
session.exportTo(flowFile, os);

// parse
final BsonArrayCodec arrayCodec = new BsonArrayCodec();
final DecoderContext decoderContext = DecoderContext.builder().build();
final BsonArray updateItems;
try (final Reader reader = new InputStreamReader(session.read(flowFile))) {
try (final Reader reader = new InputStreamReader(session.read(flowFile), charset)) {
updateItems = arrayCodec.decode(new JsonReader(reader), decoderContext);
}

Expand Down Expand Up @@ -199,15 +195,15 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
getLogger().error("Failed to bulk-update {} into MongoDB", flowFile, e);
session.transfer(flowFile, REL_FAILURE);
context.yield();
try {
if (clientSession != null) {
if (clientSession != null) {
try {
if (clientSession.hasActiveTransaction()) {
clientSession.abortTransaction();
}
clientSession.close();
} catch (Exception ee) {
getLogger().warn("Cannot rollback client session", ee); // (but no further action)
}
} catch (Exception ee) {
getLogger().warn("Cannot rollback client session", ee); // (but no further action)
}
}
}
Expand Down

0 comments on commit 5960536

Please sign in to comment.