Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-12093] Overhaul ElasticsearchIO.Write #14347

Merged
merged 12 commits into from
May 27, 2021

Conversation

egalpin
Copy link
Member

@egalpin egalpin commented Mar 26, 2021

This change set represents a rather large (and backward compatible) change to the way ElastichsearchIO.Write operates. Presently, the Write transform has 2 responsibilities:

  1. Convert input documents into Bulk API entities, serializing based on user settings (partial update, delete, upsert, etc) -> DocToBulk
  2. Batch the converted Bulk API entities together and interface with the target ES cluster -> BulkIO

This PR aims to separate these 2 responsibilities into discrete PTransforms to allow for greater flexibility while also maintaining the convenience of the Write transform to perform both document conversion and IO serially. Examples of how the flexibility of separate transforms could be used:

  1. Unit testing. It becomes trivial for pipeline developers to ensure that output Bulk API entities for a given set of inputs will produce an expected result, without the need for an available Elasticsearch cluster.
  2. Flexible options for data backup. Serialized Bulk API entities can be forked and sent to both Elasticsearch and a data lake.
  3. Mirroring data to multiple clusters. Presently, mirroring data to multiple clusters would require duplicate computation.
  4. Better batching with input streams in one job. A job may produce multiple "shapes" of Bulk API entities based on multiple input types, and then "fan-in" all serialized Bulk entities into a single BulkIO transform to improve batching semantics.
  5. Decoupled jobs. Corollary to (4) above. Job(s) could be made to produce Bulk entities and then publish them to a message bus. A distinct job could consume from that message bus and solely be responsible for IO with the target cluster(s).
  6. Easier support for multiple BulkIO semantics.

Expanding on point (6), this PR also introduces a new (optional) way to batch entities for bulk requests: Stateful Processing. Presently, Bulk request size is limited by the lesser of Runner bundle size and maxBatchSize user setting. In my experience, bundle sizes are often very small, and can be a small as 1 or 2. When that’s the case, it means Bulk requests contain only 1 or 2 documents, and it’s effectively the same as not using the Bulk API at all. BulkIOStatefulFn is made to be compatible with GroupIntoBatches which will use entity count and (optionally) elapsed time to create batches much closer to the maxBatchSize setting to improve throughput.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

Post-Commit Tests Status (on master branch)

Lang SDK ULR Dataflow Flink Samza Spark Twister2
Go --- --- Build Status --- Build Status ---
Java Build Status Build Status Build Status
Build Status
Build Status
Build Status
Build Status Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status
Build Status
Build Status
Build Status
Python Build Status
Build Status
Build Status
--- Build Status
Build Status
Build Status
Build Status
Build Status
--- Build Status ---
XLang Build Status --- Build Status Build Status --- Build Status ---

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website Whitespace Typescript
Non-portable Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status Build Status Build Status
Portable --- Build Status --- --- --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests

See CI.md for more information about GitHub Actions CI.

@egalpin
Copy link
Member Author

egalpin commented Mar 26, 2021

As this is my first time contributing, I'm not sure exactly who to select as a reviewer. I see @echauchot in the git blame a lot for ElasticsearchIO.java, so I'll start there? 😂

@echauchot
Copy link
Contributor

Sure, I'll be happy to review. Thanks for your contribution !

@egalpin
Copy link
Member Author

egalpin commented Mar 26, 2021

I have the local dev env setup by using start-build-env.sh from this repo, but I'm still working towards running elasticsearch-tests. Any pointers would be appreciated if anyone has time 🙂

@egalpin egalpin force-pushed the elasticsearchio-support-stateful branch from e4f2f52 to ec6b1c9 Compare March 30, 2021 14:14
@pabloem pabloem requested a review from echauchot April 1, 2021 21:13
@echauchot
Copy link
Contributor

@egalpin thanks for your contribution. I'm sorry I lack time a lot lately. In the meantime can you:

  • make the build pass (precommit fail)
  • as it is rather large contribution : please open a ticket with details and rename PR name.

@egalpin egalpin changed the title Overhaul ElasticsearchIO.Write [BEAM-12093] Overhaul ElasticsearchIO.Write Apr 5, 2021
@egalpin egalpin force-pushed the elasticsearchio-support-stateful branch from ed73351 to ff97fd9 Compare April 5, 2021 13:48
@egalpin
Copy link
Member Author

egalpin commented Apr 5, 2021

@echauchot I've made a jira ticket and linked it. I'm working on getting the build to pass but struggling a bit with trying to determine the cause of the new errors in the Java PreCommit build. All the warnings have to do with a single Kotlin example which seems far removed from the changes here. I'll keep poking away at it though. I've found the full output from the build and can see some compilation errors. Working on those.

@egalpin egalpin force-pushed the elasticsearchio-support-stateful branch 7 times, most recently from 9a6cd5d to 15b128b Compare April 8, 2021 15:13
@egalpin
Copy link
Member Author

egalpin commented Apr 8, 2021

@echauchot FYI build is passing now 🙂

@pabloem
Copy link
Member

pabloem commented Apr 8, 2021

@echauchot let me know if you can take a look at this, and if not I can help you find more reviewers : )

@egalpin
Copy link
Member Author

egalpin commented Apr 8, 2021

@pabloem fyi I messaged echauchot on Apache beam slack previously and we had arranged to chat about tests together tomorrow. I wanted to let them know that I’d gotten past my blockers with respect to tests, but I definitely did not intend to apply pressure or name+shame or anything like that.

@pabloem
Copy link
Member

pabloem commented Apr 8, 2021

oh I also didn't want to pressure anyone : ) I'm just checking. thanks for the update!

@egalpin
Copy link
Member Author

egalpin commented Apr 8, 2021

Haha thanks! I appreciate it. I wanted to be sure I was respecting others' busy schedules ❤️

@echauchot
Copy link
Contributor

@egalpin starting first round of review sorry for the delay

@echauchot
Copy link
Contributor

@timrobertson100 you might be interested. @ludovic-boutros you were thinking about changing the overall arhictecture of the IO and introduce a testContainer based test framework here: https://lists.apache.org/thread.html/reb68f37c435995a64ded19100e09dfc31c5cf6227feae16494226100%40%3Cdev.beam.apache.org%3E
any comment ?

@echauchot
Copy link
Contributor

@echauchot let me know if you can take a look at this, and if not I can help you find more reviewers : )

@pabloem  @egalpin I'll do an overall review but I'd need another reviewer to do the in-depth review because:

  • I'm busy on many other things lately
  • I coded support for ES 2, ES 5, reviewed 6 and coded ES7. I'd like to pass the torch :)

@egalpin
Copy link
Member Author

egalpin commented Apr 12, 2021

Thanks @echauchot for sharing that thread. I really like a lot of what @ludovic-boutros proposed and have many shared goals; in particular, implementing the pattern where successful and failed writes can be returned via MultiOutputReceiver. I believe the multi-output pattern could fit within the current IO with some additional effort (since order of request and response entities is guaranteed[1]), and I had planned to do that as a follow-up so as to not introduce even more changes in one PR.

At the same time, I also see the argument that in many ways using the low-level client results in "reinventing the wheel" for a number of features (with good justification, IMO, of enabling cross-version support).

I'd be very willing to contribute to brainstorming (and implementation once we reach that point) if others are open to that.

[1] https://discuss.elastic.co/t/order-of-actions-in-bulk-api-via-http-between-request-and-response-is-guaranteed/122499/2

@egalpin
Copy link
Member Author

egalpin commented Apr 13, 2021

Run Java PreCommit

@ludovic-boutros
Copy link

@egalpin @echauchot I made a quick review, but, well, sadly I don't have enough time these months to go deeper on the subject.
We are using the module I shared in production for months without any issue.
The development of a new multi module component is currently in stand by with one module per Elasticsearch version and a top level abstraction.
It's not finished yet and with the Covid situation I had to refocus on other projects. That means that I will not get any time to really go further soon. I'm available to share on this with you (Slack ?).
I think political decision should be taken first before taking the direction I proposed. But it's not the place to have this discussion.

@echauchot
Copy link
Contributor

This change set represents a rather large (and backward compatible) change to the way ElastichsearchIO.Write operates. Presently, the Write transform has 2 responsibilities:

  1. Convert input documents into Bulk API entities, serializing based on user settings (partial update, delete, upsert, etc) -> DocToBulk
  2. Batch the converted Bulk API entities together and interface with the target ES cluster -> BulkIO

This PR aims to separate these 2 responsibilities into discrete PTransforms to allow for greater flexibility while also maintaining the convenience of the Write transform to perform both document conversion and IO serially. Examples of how the flexibility of separate transforms could be used:

  1. Unit testing. It becomes trivial for pipeline developers to ensure that output Bulk API entities for a given set of inputs will produce an expected result, without the need for an available Elasticsearch cluster.
  2. Flexible options for data backup. Serialized Bulk API entities can be forked and sent to both Elasticsearch and a data lake.
  3. Mirroring data to multiple clusters. Presently, mirroring data to multiple clusters would require duplicate computation.
  4. Better batching with input streams in one job. A job may produce multiple "shapes" of Bulk API entities based on multiple input types, and then "fan-in" all serialized Bulk entities into a single BulkIO transform to improve batching semantics.
  5. Decoupled jobs. Corollary to (4) above. Job(s) could be made to produce Bulk entities and then publish them to a message bus. A distinct job could consume from that message bus and solely be responsible for IO with the target cluster(s).
  6. Easier support for multiple BulkIO semantics.

=> Reading at the overall design goals, it looks very promising and a good analysis of the missing properties of the curent architecture ! Thanks !

Expanding on point (6), this PR also introduces a new (optional) way to batch entities for bulk requests: Stateful Processing. Presently, Bulk request size is limited by the lesser of Runner bundle size and maxBatchSize user setting. In my experience, bundle sizes are often very small, and can be a small as 1 or 2. When that’s the case, it means Bulk requests contain only 1 or 2 documents, and it’s effectively the same as not using the Bulk API at all. BulkIOStatefulFn is made to be compatible with GroupIntoBatches which will use entity count and (optionally) elapsed time to create batches much closer to the maxBatchSize setting to improve throughput.

=> True that very small batches can exist for example Flink being a streaming oriented platform, Flink runner tends to create very small Beam bundles. So, when the bundle is finished processing (finishBundle is called), the ES bulk request is sent leading to small ES bulk. Leveraging GroupIntoBatches that creates trans-bundle groups and still respect Beam semantics (windowing, bundle retries etc...) is a very good idea.

Copy link
Contributor

@echauchot echauchot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very good work Evan ! Thanks.

I like the analysis of the missing features and all the improvements you gave !
I did a pretty in depth review after all but I no more have time. @pabloem as you offered help, could you please do the other rounds of review and merge when ok ?

Besides, Evan, as you know ES very well, and you seem to be interested in contributing. Would you be interested in putting yourself in ES Owners file and jira ES label ?

@egalpin
Copy link
Member Author

egalpin commented Apr 20, 2021

@echauchot Thanks for the review, I'll work my way through your comments and suggestions.

Besides, Evan, as you know ES very well, and you seem to be interested in contributing. Would you be interested in putting yourself in ES Owners file and jira ES label ?

I'd be very happy to 👍 I've added myself to the ES owners file now, happy to lend a hand reviewing! Thanks 🙂

With respect to Jira, could you please add appropriate permissions for me to either assign myself to the ES label, or assign me to the label yourself if that is the preferred workflow. I have an account on issues.apache.org/jira but only with permission to create tickets I believe.

@egalpin egalpin requested a review from echauchot May 5, 2021 04:57
@egalpin
Copy link
Member Author

egalpin commented May 6, 2021

@echauchot added coverage for the methods you mentioned. Anything else outstanding? 🙂

Copy link
Contributor

@echauchot echauchot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only minor changes left. I think when you address them, I could hit the merge button provided the tests pass.

@egalpin
Copy link
Member Author

egalpin commented May 12, 2021

Run Java PreCommit

2 similar comments
@egalpin
Copy link
Member Author

egalpin commented May 12, 2021

Run Java PreCommit

@egalpin
Copy link
Member Author

egalpin commented May 12, 2021

Run Java PreCommit

@egalpin egalpin requested a review from echauchot May 17, 2021 12:19
@egalpin
Copy link
Member Author

egalpin commented May 19, 2021

Ready to roll! 🙂

@echauchot
Copy link
Contributor

@egalpin seems good ! thanks ! I just triggered the build. it's like fort knox now on resources consumption 😄

Copy link
Contributor

@echauchot echauchot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@egalpin ready to merge. only the 2 javadoc (assert numdocs/num scientits) fixes and the not needed gbk in parallel test (+consequent simplification of assertion function) and we're good to merge.

@egalpin
Copy link
Member Author

egalpin commented May 25, 2021

@echauchot all set for a final look-over 🙂

@egalpin
Copy link
Member Author

egalpin commented May 25, 2021

Run Java PreCommit

2 similar comments
@egalpin
Copy link
Member Author

egalpin commented May 25, 2021

Run Java PreCommit

@egalpin
Copy link
Member Author

egalpin commented May 25, 2021

Run Java PreCommit

Copy link
Contributor

@echauchot echauchot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM,
Thanks for you great and hard work Evan ! And also thanks for your patience.
As stated in the guidelines, I'll squash the review commits into the first commit and merge.

@echauchot echauchot merged commit 4f1f1c1 into apache:master May 27, 2021
@egalpin
Copy link
Member Author

egalpin commented May 27, 2021

Thanks Etienne for all of your reviewing efforts, and your warm welcome to Beam! 😄

@mattwelke
Copy link

Came across this while lurking. Where would one find docs on how to use ElasticsearchIO, including more advanced features like this? I saw some examples on the Beam site, but nothing specific to each source/sink.

@egalpin
Copy link
Member Author

egalpin commented Jan 22, 2022

@mattwelke The javadoc[1] that is generated has some description and examples, though admittedly not as fully descriptive as it could be. I would welcome any additional examples or documentation added by others, and will keep in mind to add more examples when time permits!

Do you have any specific questions or a use case you would like help determining how to best use this IO? I might suggest that we move the conversation to the user mailing list or slack[2] so that others could more easily benefit from our conversation 🙂

[1] https://beam.apache.org/releases/javadoc/2.35.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.html

[2] https://beam.apache.org/community/join-beam/

@timrobertson100
Copy link
Contributor

@mattwelke
Copy link

Those are a good start, thanks. For any more discussion, I'll use the mailing list or Slack.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants