Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NIFI-11129 - adding PutMongoBulk processor to use the bulkWrite API #6918

Closed

Conversation

sebastianrothbucher
Copy link

NIFI-11129 - adding PutMongoBulk processor to use the bulkWrite API for way more efficient mass updates or inserts

Summary

NIFI-11129

Tracking

Please complete the following tracking steps prior to pull request creation.

Issue Tracking

Pull Request Tracking

  • Pull Request title starts with Apache NiFi Jira issue number, such as NIFI-00000
  • Pull Request commit message starts with Apache NiFi Jira issue number, as such NIFI-00000

Pull Request Formatting

  • Pull Request based on current revision of the main branch
  • Pull Request refers to a feature branch with one commit containing changes

Verification

Please indicate the verification steps performed prior to pull request creation.

Build

Licensing

  • n/a New dependencies are compatible with the Apache License 2.0 according to the License Policy
  • n/a New dependencies are documented in applicable LICENSE and NOTICE files

Documentation

  • n/a (but tested additionalDetails.html) Documentation formatting appears as expected in rendered files

@InputRequirement(Requirement.INPUT_REQUIRED)
@CapabilityDescription("Writes the contents of a FlowFile to MongoDB as bulk-update")
@SystemResourceConsideration(resource = SystemResource.MEMORY)
public class PutMongoBulk extends AbstractMongoProcessor {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two questions:

  1. Do we need a minimum driver version to use this?
  2. Have you considered breaking this functionality up so it can efficiently use the bulkwrite api with the record api for bulk ingestion as well as a generic bulk write operation processor?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Mike,
thx for reviewing!
re 1.: I tested with the Mongo driver bundled with 1.13 at the time and it worked; I just had to do some adjustments for the driver bundled with 1.20-SNAPSHOT. Concerning operations: all Mongo 4.2+ (i.e. all active versions) support bulkWrite. I'd say: we should be fine
re 2.: I deliberately did not use records to allow freeform updates with $set, $inc, etc. as operations. Of course, you could define an avro schema for this, but I think it's a stretch. Original intention was to use it when you need more fine-grained control of updates vis-a-vis PutMongoRecord. Does that make sense? Could of course xpand the additional description HTML...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#2 makes sense. Is the bulkwrite api supposed to be faster and more flexible, or just more flexible? Based on your response, if the former then that has implications for PutMongoRecord too.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

putMongoRecord uses the same bulkWrite API under the hood, so the PubMongoBulk would be more flexible and equally fast compared to PutMongoRecord

@sebastianrothbucher
Copy link
Author

anything I can do to get the ball rolling again?

@MikeThomsen
Copy link
Contributor

@sebastianrothbucher yeah, I'm adding this to my TODO to get a review going.

Copy link
Contributor

@exceptionfactory exceptionfactory left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the contribution @sebastianrothbucher. I noticed some implementation concerns, but before proceeding to review the code details, it would be helpful to evaluate the scope of this Processor.

Although the documentation makes it clear that the input JSON needs to match MongoDB requirements, the generalized nature of the Processor seems like it could be confusing. The Put prefix generally signifies some kind of insert operation, but the implementation allows insert, update, replace, or delete. Although having a general Processor can be helpful, the scope seems too open-ended in this situation. One option is renaming the Processor to something like ExecuteMongo, along the lines of ExecuteSQL. On the other hand, the conditionals inside the onTrigger method perform significantly different operations. Is the goal to support a combination of insert, update, and delete operations in a single execution? This is generally counter to most other Processors.

The Mongo Client Service should also be the preferred configuration method.

With that background, I think it would be helpful to refine the scope and target use case for this Processor.

Copy link
Contributor

@MikeThomsen MikeThomsen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry it took a while to get back to you. Left you with a bunch of feedback. Good first pass at this processor.

@MikeThomsen
Copy link
Contributor

@exceptionfactory

On the other hand, the conditionals inside the onTrigger method perform significantly different operations. Is the goal to support a combination of insert, update, and delete operations in a single execution? This is generally counter to most other Processors.

We already have something similar in the Elasticsearch package where you can do a very complex set of bulk operations from a single flowfile with PutElasticsearchRecord.

@exceptionfactory
Copy link
Contributor

@exceptionfactory

On the other hand, the conditionals inside the onTrigger method perform significantly different operations. Is the goal to support a combination of insert, update, and delete operations in a single execution? This is generally counter to most other Processors.

We already have something similar in the Elasticsearch package where you can do a very complex set of bulk operations from a single flowfile with PutElasticsearchRecord.

Thanks for the reference @MikeThomsen, that is a helpful comparison. With that in mind, does it make sense to align this Processor to something more record-oriented, as opposed to requiring JSON?

@MikeThomsen
Copy link
Contributor

Thanks for the reference @MikeThomsen, that is a helpful comparison. With that in mind, does it make sense to align this Processor to something more record-oriented, as opposed to requiring JSON?

No, I think it's good the way it is, and it doesn't bill itself as a record-aware processor. Maybe adding "Operations" to the end of it to further clarify that it is calling the BulkWrite API, but as-is I think it should fit a useful Mongo niche.

Copy link
Contributor

@exceptionfactory exceptionfactory left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Including Operations in the Processor name sounds like it would be helpful.

The comments about issues with the MongoDB Client Service need to be resolved, in addition to to the JUnit 4 test references. If these issues can be addressed, we should be able to move forward with the review.

@sebastianrothbucher
Copy link
Author

thanks both of you, I'll take care of it

@sebastianrothbucher sebastianrothbucher force-pushed the nifi-11129 branch 2 times, most recently from 440a232 to 0949341 Compare July 11, 2023 18:04
@sebastianrothbucher
Copy link
Author

diff processors failing on otherwise identical code (I did rebase before pushing); got no windows box unfortunately to just debug in

@exceptionfactory
Copy link
Contributor

diff processors failing on otherwise identical code (I did rebase before pushing); got no windows box unfortunately to just debug in

Thanks @sebastianrothbucher, the test failure on Windows is unrelated, I restarted the workflow to give it another try.

@joewitt
Copy link
Contributor

joewitt commented Oct 1, 2023

@exceptionfactory Seems like the build checks out fine. Was that the last concern?

Copy link
Contributor

@exceptionfactory exceptionfactory left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@joewitt and @sebastianrothbucher I went through the code in more detail and noted several recommendations. After addressing these issues, I think this should be ready to go forward.

@sebastianrothbucher
Copy link
Author

Thanks! Did address them all (or at least leave an excuse in the comment ;-) )

@sebastianrothbucher
Copy link
Author

Test is due to newer mongo - working on it

Copy link
Contributor

@exceptionfactory exceptionfactory left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your patience on this pull request @sebastianrothbucher. Unfortunately there are still some implementation problems that I noted in the latest set of comments. They appear to be missed in the course of refactoring, which can happen.

To make it easier to review subsequent changes, please put new changes in a new commit, even when rebasing, so that it is easier to follow the changes from one set of comments to another.

Comment on lines 112 to 121
List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
_propertyDescriptors.addAll(descriptors);
_propertyDescriptors.add(ORDERED);
_propertyDescriptors.add(TRANSACTIONS_ENABLED);
_propertyDescriptors.add(CHARACTER_SET);
propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);

final Set<Relationship> _relationships = new HashSet<>();
_relationships.add(REL_SUCCESS);
_relationships.add(REL_FAILURE);
relationships = Collections.unmodifiableSet(_relationships);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These declarations can be streamlined and replaced with List.of() and Set.of()

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did what PutMongo also did to stay kinda in sync; for first, shorthand does not work b/c of 113; to me, it's even uglier to change style midway. Up2u, I can do it either way

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the response. As this is a new Processor, this is a good opportunity to improve the convention, so moving to List.of() and Set.of() declarations, as opposed to the static initializer and underscored collection variables, would be helpful.

.defaultValue("true")
.build();

static final PropertyDescriptor TRANSACTIONS_ENABLED = new PropertyDescriptor.Builder()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewing the implementation, is there value in making this a configurable property? On the one hand, removing the property avoids changing the client service interface and introducing the new method. On the other hand, should transactions always be used?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really, I almost never use them tbh b/c most of the time, the document is the scope of work. Tried to stick to the driver here also

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the reply. If you don't think it is necessary to support transactions for now, it seems like it would be better to remove this property, and the method from MongoDBClientService. Removing the option would simplify both the configuration and implementation. It looks like it wouldn't be difficult to add later if needed, but if you don't see a common need, then this might be a case of "less is more" for the initial version.

…rite API for way more efficient mass updates or inserts
@sebastianrothbucher
Copy link
Author

thanks for checking!

…rite API for way more efficient mass updates or inserts - finishing touches
@sebastianrothbucher
Copy link
Author

2nd failure is a Heisenbug ;-)
think we're good for this Processor

@exceptionfactory
Copy link
Contributor

exceptionfactory commented Nov 7, 2023

2nd failure is a Heisenbug ;-) think we're good for this Processor

Recent build changes introduced automated running of integration tests, and it looks like it is failing on the new IT:

Error:  Failures: 
Error:    PutMongoBulkOperationsIT.testBulkWriteInsert:65 expected: <3> but was: <0>

A number of the existing integration tests are being skipped because they are not in a position to run in an automated workflow, or they are otherwise unreliable.

In this case, the new integration test needs to be reliable, or it should not be included. Does it work for you on a local build? GitHub runners are less powerful, which can introduce certain unexpected behaviors, but this is something that will need to be addressed in this pull request as far as which way to go for the integration test.

@sebastianrothbucher
Copy link
Author

I still have to look. I ran tests before each push - and they just worked. Also from the error messages - it seems like sth gets stuck due to load. Let me get back2u

…rite API for way more efficient mass updates or inserts - test stability
@sebastianrothbucher
Copy link
Author

I don't think it was sth I did, specifically; it's just that Mongo might not yet have committed the deletion of the DB in question. I nonetheless did two things: 1.) use different test docs in different tests (which should not be necessary but gives additoinal security). and 2.) add more deletion and an explicit close to the Mongo write tests - which hopefully got this from 0.1% to 0.001% or sth

I re-ran the full test suite - and I have some other spurious errors. One in GridFS I totally cannot explain (except somehow the CI overwrites the mongo docker image - and we use another MongoDB on Github CI; just didn't see it).

The other issue is in GetMongoIT. We seem to overwrite flowfile attrs with "environment variables". Looks to me the following change makes it a little more stable. Didn't include it in the commit; maybe still useful

diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoIT.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoIT.java
index 87a0ca3d56..58e9adea6f 100644
--- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoIT.java
+++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoIT.java
@@ -502,10 +502,9 @@ public class GetMongoIT extends AbstractMongoIT {
             db.getCollection(collections[x])
                 .insertOne(new Document().append("msg", "Hello, World"));
 
-            Map<String, String> attrs = new HashMap<>();
-            attrs.put("db", dbs[x]);
-            attrs.put("collection", collections[x]);
-            runner.enqueue(query, attrs);
+            runner.setEnvironmentVariableValue("db", dbs[x]);
+            runner.setEnvironmentVariableValue("collection", collections[x]);
+            runner.enqueue(query);
             runner.run();
 
             db.drop();

Copy link
Contributor

@exceptionfactory exceptionfactory left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the latest round of updates, I noted two more comments, and based on your response and updates, I think this should be ready to go.

Comment on lines 112 to 121
List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
_propertyDescriptors.addAll(descriptors);
_propertyDescriptors.add(ORDERED);
_propertyDescriptors.add(TRANSACTIONS_ENABLED);
_propertyDescriptors.add(CHARACTER_SET);
propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);

final Set<Relationship> _relationships = new HashSet<>();
_relationships.add(REL_SUCCESS);
_relationships.add(REL_FAILURE);
relationships = Collections.unmodifiableSet(_relationships);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the response. As this is a new Processor, this is a good opportunity to improve the convention, so moving to List.of() and Set.of() declarations, as opposed to the static initializer and underscored collection variables, would be helpful.

.defaultValue("true")
.build();

static final PropertyDescriptor TRANSACTIONS_ENABLED = new PropertyDescriptor.Builder()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the reply. If you don't think it is necessary to support transactions for now, it seems like it would be better to remove this property, and the method from MongoDBClientService. Removing the option would simplify both the configuration and implementation. It looks like it wouldn't be difficult to add later if needed, but if you don't see a common need, then this might be a case of "less is more" for the initial version.

…rite API for way more efficient mass updates or inserts - final review feedback
@sebastianrothbucher
Copy link
Author

all right - addressed both final issues

Copy link
Contributor

@exceptionfactory exceptionfactory left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks again for your patience and perseverance on this new Processor @sebastianrothbucher. After one more review, I noticed that the Ordered property can be set as required because it should always have a value, but I will make that adjustment when merging. The latest version looks good, thanks again! +1 merging

@sebastianrothbucher
Copy link
Author

+1 as well, thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants